1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.slf4j.LoggerFactory.getLogger;
21 import static org.testng.Assert.assertEquals;
22 import static org.testng.Assert.assertNull;
23 import static org.testng.Assert.fail;
24
25 import java.io.IOException;
26 import java.util.Arrays;
27
28 import org.apache.hadoop.hbase.client.Delete;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.filter.CompareFilter;
34 import org.apache.hadoop.hbase.filter.Filter;
35 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.slf4j.Logger;
38 import org.testng.ITestContext;
39 import org.testng.annotations.BeforeMethod;
40 import org.testng.annotations.Test;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @Test(groups = "sharedHBase")
60 public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
61
62 private static final Logger LOG = getLogger(TestBaillisAnomaliesWithTXs.class);
63 private static final String TEST_COLUMN = "baillis-col";
64
65
66
67 private byte[] famName = Bytes.toBytes(TEST_FAMILY);
68 private byte[] colName = Bytes.toBytes(TEST_COLUMN);
69
70 private byte[] rowId1 = Bytes.toBytes("row1");
71 private byte[] rowId2 = Bytes.toBytes("row2");
72 private byte[] rowId3 = Bytes.toBytes("row3");
73
74 private byte[] dataValue1 = Bytes.toBytes(10);
75 private byte[] dataValue2 = Bytes.toBytes(20);
76 private byte[] dataValue3 = Bytes.toBytes(30);
77
78
79 @Test(timeOut = 10_000)
80 public void testSIPreventsPredicateManyPrecedersForReadPredicates(ITestContext context) throws Exception {
81
82
83
84
85
86
87
88
89
90
91 TransactionManager tm = newTransactionManager(context);
92 TTable txTable = new TTable(connection, TEST_TABLE);
93
94 Transaction tx1 = tm.begin();
95 Transaction tx2 = tm.begin();
96
97
98 Scan scan = new Scan();
99 Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(30));
100 scan.setFilter(f);
101 ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
102 assertNull(tx1Scanner.next());
103
104
105 Put newRow = new Put(rowId3);
106 newRow.addColumn(famName, colName, dataValue3);
107 txTable.put(tx2, newRow);
108
109
110 tm.commit(tx2);
111
112
113 tx1Scanner = txTable.getScanner(tx1, scan);
114 assertNull(tx1Scanner.next());
115
116
117 tm.commit(tx1);
118 }
119
120 @Test(timeOut = 10_000)
121 public void testSIPreventsPredicateManyPrecedersForWritePredicates(ITestContext context) throws Exception {
122
123
124
125
126
127
128
129
130
131 TransactionManager tm = newTransactionManager(context);
132 TTable txTable = new TTable(connection, TEST_TABLE);
133 Transaction tx1 = tm.begin();
134 Transaction tx2 = tm.begin();
135
136
137 Scan updateScan = new Scan();
138 ResultScanner tx1Scanner = txTable.getScanner(tx2, updateScan);
139 Result updateRes = tx1Scanner.next();
140 int count = 0;
141 while (updateRes != null) {
142 LOG.info("RESSS {}", updateRes);
143 Put row = new Put(updateRes.getRow());
144 int val = Bytes.toInt(updateRes.getValue(famName, colName));
145 LOG.info("Updating row id {} with value {}", Bytes.toString(updateRes.getRow()), val);
146 row.addColumn(famName, colName, Bytes.toBytes(val + 10));
147 txTable.put(tx1, row);
148 updateRes = tx1Scanner.next();
149 count++;
150 }
151 assertEquals(count, 2);
152
153
154 Scan scan = new Scan();
155 Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
156 scan.setFilter(f);
157 ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
158
159 Result res = tx2Scanner.next();
160 int count20 = 0;
161 while (res != null) {
162 LOG.info("RESSS {}", res);
163 LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()),
164 Bytes.toInt(res.getValue(famName, colName)));
165 Delete delete20 = new Delete(res.getRow());
166 txTable.delete(tx2, delete20);
167 res = tx2Scanner.next();
168 count20++;
169 }
170 assertEquals(count20, 1);
171
172 try {
173 tm.commit(tx1);
174 } catch (RollbackException e) {
175 if (!getClient(context).isLowLatency())
176 fail();
177 }
178
179 tx2Scanner = txTable.getScanner(tx2, scan);
180
181 if (!getClient(context).isLowLatency())
182 assertNull(tx2Scanner.next());
183
184
185 try {
186 tm.commit(tx2);
187 fail();
188 } catch (RollbackException e) {
189
190 }
191
192 }
193
194 @Test(timeOut = 10_000)
195 public void testSIPreventsLostUpdates(ITestContext context) throws Exception {
196
197
198
199
200
201
202
203
204
205
206
207 TransactionManager tm = newTransactionManager(context);
208 TTable txTable = new TTable(connection, TEST_TABLE);
209 Transaction tx1 = tm.begin();
210 Transaction tx2 = tm.begin();
211
212 Scan scan = new Scan(rowId1, rowId1);
213 scan.addColumn(famName, colName);
214
215
216 ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
217 Result res = tx1Scanner.next();
218 int count = 0;
219 while (res != null) {
220 LOG.info("RESSS {}", res);
221 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
222 Bytes.toString(res.getValue(famName, colName)));
223 assertEquals(res.getRow(), rowId1);
224 assertEquals(res.getValue(famName, colName), dataValue1);
225 res = tx1Scanner.next();
226 count++;
227 }
228 assertEquals(count, 1);
229
230
231 ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
232 res = tx2Scanner.next();
233 count = 0;
234 while (res != null) {
235 LOG.info("RESSS {}", res);
236 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
237 Bytes.toString(res.getValue(famName, colName)));
238 assertEquals(res.getRow(), rowId1);
239 assertEquals(res.getValue(famName, colName), dataValue1);
240 res = tx2Scanner.next();
241 count++;
242 }
243 assertEquals(count, 1);
244
245
246 Put updateRow1Tx1 = new Put(rowId1);
247 updateRow1Tx1.addColumn(famName, colName, Bytes.toBytes("11"));
248 txTable.put(tx1, updateRow1Tx1);
249
250
251 Put updateRow1Tx2 = new Put(rowId1);
252 updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes("11"));
253 txTable.put(tx2, updateRow1Tx2);
254
255
256 tm.commit(tx1);
257
258
259 try {
260 tm.commit(tx2);
261 fail();
262 } catch (RollbackException e) {
263
264 }
265
266 }
267
268 @Test(timeOut = 10_000)
269 public void testSIPreventsReadSkew(ITestContext context) throws Exception {
270
271
272
273
274
275
276
277
278
279
280
281
282
283 TransactionManager tm = newTransactionManager(context);
284 TTable txTable = new TTable(connection, TEST_TABLE);
285 Transaction tx1 = tm.begin();
286 Transaction tx2 = tm.begin();
287
288 Scan rowId1Scan = new Scan(rowId1, rowId1);
289 rowId1Scan.addColumn(famName, colName);
290
291
292 ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId1Scan);
293 Result res = tx1Scanner.next();
294 int count = 0;
295 while (res != null) {
296 LOG.info("RESSS {}", res);
297 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
298 Bytes.toString(res.getValue(famName, colName)));
299 assertEquals(res.getRow(), rowId1);
300 assertEquals(res.getValue(famName, colName), dataValue1);
301 res = tx1Scanner.next();
302 count++;
303 }
304 assertEquals(count, 1);
305
306
307 ResultScanner tx2Scanner = txTable.getScanner(tx2, rowId1Scan);
308 res = tx2Scanner.next();
309 count = 0;
310 while (res != null) {
311 LOG.info("RESSS {}", res);
312 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
313 Bytes.toString(res.getValue(famName, colName)));
314 assertEquals(res.getRow(), rowId1);
315 assertEquals(res.getValue(famName, colName), dataValue1);
316 res = tx2Scanner.next();
317 count++;
318 }
319
320 Scan rowId2Scan = new Scan(rowId2, rowId2);
321 rowId2Scan.addColumn(famName, colName);
322
323
324 tx2Scanner = txTable.getScanner(tx2, rowId2Scan);
325 res = tx2Scanner.next();
326 count = 0;
327 while (res != null) {
328 LOG.info("RESSS {}", res);
329 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
330 Bytes.toString(res.getValue(famName, colName)));
331 assertEquals(res.getRow(), rowId2);
332 assertEquals(res.getValue(famName, colName), dataValue2);
333 res = tx2Scanner.next();
334 count++;
335 }
336
337
338 Put updateRow1Tx2 = new Put(rowId1);
339 updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes("12"));
340 txTable.put(tx1, updateRow1Tx2);
341
342
343 Put updateRow2Tx2 = new Put(rowId2);
344 updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes("18"));
345 txTable.put(tx2, updateRow2Tx2);
346
347
348 tm.commit(tx2);
349
350
351 tx1Scanner = txTable.getScanner(tx1, rowId2Scan);
352 res = tx1Scanner.next();
353 count = 0;
354 while (res != null) {
355 LOG.info("RESSS {}", res);
356 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
357 Bytes.toString(res.getValue(famName, colName)));
358 assertEquals(res.getRow(), rowId2);
359 assertEquals(res.getValue(famName, colName), dataValue2);
360 res = tx1Scanner.next();
361 count++;
362 }
363
364
365 tm.commit(tx1);
366
367 }
368
369 @Test(timeOut = 10_000)
370 public void testSIPreventsReadSkewUsingWritePredicate(ITestContext context) throws Exception {
371
372
373
374
375
376
377
378
379
380
381
382
383 TransactionManager tm = newTransactionManager(context);
384 TTable txTable = new TTable(connection, TEST_TABLE);
385 Transaction tx1 = tm.begin();
386 Transaction tx2 = tm.begin();
387
388
389 assertNumberOfRows(txTable, tx1, 2, new Scan());
390
391
392 assertNumberOfRows(txTable, tx2, 2, new Scan());
393
394
395
396 Put updateRow1Tx2 = new Put(rowId1);
397 updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes(12));
398 Put updateRow2Tx2 = new Put(rowId2);
399 updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes(18));
400 txTable.put(tx2, Arrays.asList(updateRow1Tx2, updateRow2Tx2));
401
402
403 tm.commit(tx2);
404
405
406
407 Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
408 Scan checkFor20 = new Scan();
409 checkFor20.setFilter(f);
410 ResultScanner checkFor20Scanner = txTable.getScanner(tx1, checkFor20);
411 Result res = checkFor20Scanner.next();
412 while (res != null) {
413 LOG.info("RESSS {}", res);
414 LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
415 Delete delete20 = new Delete(res.getRow());
416 txTable.delete(tx1, delete20);
417 res = checkFor20Scanner.next();
418 }
419
420
421 try {
422 tm.commit(tx1);
423 fail("Should be aborted");
424 } catch (RollbackException e) {
425
426 }
427
428 }
429
430
431 @Test(timeOut = 10_000)
432 public void testSIDoesNotPreventWriteSkew(ITestContext context) throws Exception {
433
434
435
436
437
438
439
440
441
442
443
444 TransactionManager tm = newTransactionManager(context);
445 TTable txTable = new TTable(connection, TEST_TABLE);
446 Transaction tx1 = tm.begin();
447 Transaction tx2 = tm.begin();
448
449 Scan rowId12Scan = new Scan(rowId1, rowId3);
450 rowId12Scan.addColumn(famName, colName);
451
452
453 ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId12Scan);
454 Result res = tx1Scanner.next();
455 int count = 0;
456 while (res != null) {
457 LOG.info("RESSS {}", res);
458 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
459 switch (count) {
460 case 0:
461 assertEquals(res.getRow(), rowId1);
462 assertEquals(res.getValue(famName, colName), dataValue1);
463 break;
464 case 1:
465 assertEquals(res.getRow(), rowId2);
466 assertEquals(res.getValue(famName, colName), dataValue2);
467 break;
468 default:
469 fail();
470 }
471 res = tx1Scanner.next();
472 count++;
473 }
474 assertEquals(count, 2);
475
476
477 ResultScanner tx2Scanner = txTable.getScanner(tx1, rowId12Scan);
478 res = tx2Scanner.next();
479 count = 0;
480 while (res != null) {
481 LOG.info("RESSS {}", res);
482 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
483 switch (count) {
484 case 0:
485 assertEquals(res.getRow(), rowId1);
486 assertEquals(res.getValue(famName, colName), dataValue1);
487 break;
488 case 1:
489 assertEquals(res.getRow(), rowId2);
490 assertEquals(res.getValue(famName, colName), dataValue2);
491 break;
492 default:
493 fail();
494 }
495 res = tx2Scanner.next();
496 count++;
497 }
498 assertEquals(count, 2);
499
500
501 Put updateRow1Tx1 = new Put(rowId1);
502 updateRow1Tx1.addColumn(famName, colName, Bytes.toBytes("11"));
503 txTable.put(tx1, updateRow1Tx1);
504
505
506 Put updateRow2Tx2 = new Put(rowId2);
507 updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes("21"));
508 txTable.put(tx2, updateRow2Tx2);
509
510
511 tm.commit(tx1);
512
513
514 tm.commit(tx2);
515 }
516
517
518 @Test(timeOut = 10_000)
519 public void testSIDoesNotPreventAntiDependencyCycles(ITestContext context) throws Exception {
520
521
522
523
524
525
526
527
528
529
530
531
532 TransactionManager tm = newTransactionManager(context);
533 TTable txTable = new TTable(connection, TEST_TABLE);
534 Transaction tx1 = tm.begin();
535 Transaction tx2 = tm.begin();
536
537 Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes("30"));
538 Scan value30 = new Scan();
539 value30.setFilter(f);
540 value30.addColumn(famName, colName);
541
542
543 assertNumberOfRows(txTable, tx1, 0, value30);
544
545
546
547 assertNumberOfRows(txTable, tx2, 0, value30);
548
549
550
551 Put insertRow3Tx1 = new Put(rowId1);
552 insertRow3Tx1.addColumn(famName, colName, Bytes.toBytes("30"));
553 txTable.put(tx1, insertRow3Tx1);
554
555
556 Put updateRow4Tx2 = new Put(rowId2);
557 updateRow4Tx2.addColumn(famName, colName, Bytes.toBytes("42"));
558 txTable.put(tx2, updateRow4Tx2);
559
560
561 tm.commit(tx1);
562
563
564 tm.commit(tx2);
565
566
567 }
568
569
570
571
572
573
574
575
576 @BeforeMethod(alwaysRun = true)
577 private void loadBaseDataOnTestTable(ITestContext context) throws Exception {
578
579 TransactionManager tm = newTransactionManager(context);
580 TTable txTable = new TTable(connection, TEST_TABLE);
581
582 Transaction initializationTx = tm.begin();
583 Put row1 = new Put(rowId1);
584 row1.addColumn(famName, colName, dataValue1);
585 txTable.put(initializationTx, row1);
586 Put row2 = new Put(rowId2);
587 row2.addColumn(famName, colName, dataValue2);
588 txTable.put(initializationTx, row2);
589
590 tm.commit(initializationTx);
591 }
592
593
594 private void assertNumberOfRows(TTable txTable, Transaction tx2, int maxCount, Scan scan) throws IOException {
595 int count = 0;
596 ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
597 Result res = tx2Scanner.next();
598 while (res != null) {
599 LOG.info("RESSS {}", res);
600 LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
601 res = tx2Scanner.next();
602 count++;
603 }
604 assertEquals(count, maxCount);
605 }
606
607
608 }