1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.testng.Assert.assertEquals;
21 import static org.testng.Assert.assertTrue;
22 import static org.testng.Assert.fail;
23
24 import org.apache.hadoop.hbase.HColumnDescriptor;
25 import org.apache.hadoop.hbase.HTableDescriptor;
26 import org.apache.hadoop.hbase.TableName;
27 import org.apache.hadoop.hbase.client.Admin;
28 import org.apache.hadoop.hbase.client.Connection;
29 import org.apache.hadoop.hbase.client.ConnectionFactory;
30 import org.apache.hadoop.hbase.client.Delete;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.testng.ITestContext;
41 import org.testng.annotations.Test;
42
43 import java.io.IOException;
44
45 @Test(groups = "sharedHBase")
46 public class TestTransactionConflict extends OmidTestBase {
47
48 private static final Logger LOG = LoggerFactory.getLogger(TestTransactionConflict.class);
49
50 @Test(timeOut = 10_000)
51 public void runTestWriteWriteConflict(ITestContext context) throws Exception {
52 TransactionManager tm = newTransactionManager(context);
53 TTable tt = new TTable(connection, TEST_TABLE);
54
55 Transaction t1 = tm.begin();
56 LOG.info("Transaction created " + t1);
57
58 Transaction t2 = tm.begin();
59 LOG.info("Transaction created" + t2);
60
61 byte[] row = Bytes.toBytes("test-simple");
62 byte[] fam = Bytes.toBytes(TEST_FAMILY);
63 byte[] col = Bytes.toBytes("testdata");
64 byte[] data1 = Bytes.toBytes("testWrite-1");
65 byte[] data2 = Bytes.toBytes("testWrite-2");
66
67 Put p = new Put(row);
68 p.addColumn(fam, col, data1);
69 tt.put(t1, p);
70
71 Put p2 = new Put(row);
72 p2.addColumn(fam, col, data2);
73 tt.put(t2, p2);
74
75 tm.commit(t2);
76
77 try {
78 tm.commit(t1);
79 fail("Transaction should not commit successfully");
80 } catch (RollbackException e) {
81 }
82 }
83
84 @Test(timeOut = 10_000)
85 public void runTestMultiTableConflict(ITestContext context) throws Exception {
86 TransactionManager tm = newTransactionManager(context);
87 TTable tt = new TTable(connection, TEST_TABLE);
88 String table2 = TEST_TABLE + 2;
89 TableName table2Name = TableName.valueOf(table2);
90
91 try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
92 Admin admin = conn.getAdmin()) {
93 TableName htable2 = TableName.valueOf(table2);
94
95 if (!admin.tableExists(htable2)) {
96 HTableDescriptor desc = new HTableDescriptor(table2Name);
97 HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
98 datafam.setMaxVersions(Integer.MAX_VALUE);
99 desc.addFamily(datafam);
100
101 admin.createTable(desc);
102 }
103
104 if (admin.isTableDisabled(htable2)) {
105 admin.enableTable(htable2);
106 }
107 }
108
109 TTable tt2 = new TTable(connection, table2);
110
111 Transaction t1 = tm.begin();
112 LOG.info("Transaction created " + t1);
113
114 Transaction t2 = tm.begin();
115 LOG.info("Transaction created" + t2);
116
117 byte[] row = Bytes.toBytes("test-simple");
118 byte[] row2 = Bytes.toBytes("test-simple2");
119 byte[] fam = Bytes.toBytes(TEST_FAMILY);
120 byte[] col = Bytes.toBytes("testdata");
121 byte[] data1 = Bytes.toBytes("testWrite-1");
122 byte[] data2 = Bytes.toBytes("testWrite-2");
123
124 Put p = new Put(row);
125 p.addColumn(fam, col, data1);
126 tt.put(t1, p);
127 tt2.put(t1, p);
128
129 Put p2 = new Put(row);
130 p2.addColumn(fam, col, data2);
131 tt.put(t2, p2);
132 p2 = new Put(row2);
133 p2.addColumn(fam, col, data2);
134 tt2.put(t2, p2);
135
136 tm.commit(t2);
137
138 boolean aborted = false;
139 try {
140 tm.commit(t1);
141 fail("Transaction commited successfully");
142 } catch (RollbackException e) {
143 aborted = true;
144 }
145 assertTrue(aborted, "Transaction didn't raise exception");
146
147 ResultScanner rs = tt2.getHTable().getScanner(fam, col);
148
149 int count = 0;
150 Result r;
151 while ((r = rs.next()) != null) {
152 count += r.size();
153 }
154 assertEquals(count, 1, "Should have cell");
155 }
156
157 @Test(timeOut = 10_000)
158 public void runTestCleanupAfterConflict(ITestContext context) throws Exception {
159 TransactionManager tm = newTransactionManager(context);
160 TTable tt = new TTable(connection, TEST_TABLE);
161
162 Transaction t1 = tm.begin();
163 LOG.info("Transaction created " + t1);
164
165 Transaction t2 = tm.begin();
166 LOG.info("Transaction created" + t2);
167
168 byte[] row = Bytes.toBytes("test-simple");
169 byte[] fam = Bytes.toBytes(TEST_FAMILY);
170 byte[] col = Bytes.toBytes("testdata");
171 byte[] data1 = Bytes.toBytes("testWrite-1");
172 byte[] data2 = Bytes.toBytes("testWrite-2");
173
174 Put p = new Put(row);
175 p.addColumn(fam, col, data1);
176 tt.put(t1, p);
177
178 Get g = new Get(row).setMaxVersions();
179 g.addColumn(fam, col);
180 Result r = tt.getHTable().get(g);
181 assertEquals(r.size(), 1, "Unexpected size for read.");
182 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
183 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
184
185 Put p2 = new Put(row);
186 p2.addColumn(fam, col, data2);
187 tt.put(t2, p2);
188
189 r = tt.getHTable().get(g);
190 assertEquals(r.size(), 2, "Unexpected size for read.");
191 r = tt.get(t2, g);
192 assertEquals(r.size(),1, "Unexpected size for read.");
193 assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
194 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
195
196 tm.commit(t1);
197
198 boolean aborted = false;
199 try {
200 tm.commit(t2);
201 fail("Transaction commited successfully");
202 } catch (RollbackException e) {
203 aborted = true;
204 }
205 assertTrue(aborted, "Transaction didn't raise exception");
206
207 r = tt.getHTable().get(g);
208 assertEquals(r.size(), 1, "Unexpected size for read.");
209 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
210 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
211 }
212
213 @Test(timeOut = 10_000)
214 public void testCleanupWithDeleteRow(ITestContext context) throws Exception {
215
216 TransactionManager tm = newTransactionManager(context);
217 TTable tt = new TTable(connection, TEST_TABLE);
218
219 Transaction t1 = tm.begin();
220 LOG.info("Transaction created " + t1);
221
222 int rowcount = 10;
223 int count = 0;
224
225 byte[] fam = Bytes.toBytes(TEST_FAMILY);
226 byte[] col = Bytes.toBytes("testdata");
227 byte[] data1 = Bytes.toBytes("testWrite-1");
228 byte[] data2 = Bytes.toBytes("testWrite-2");
229
230 byte[] modrow = Bytes.toBytes("test-del" + 3);
231 for (int i = 0; i < rowcount; i++) {
232 byte[] row = Bytes.toBytes("test-del" + i);
233
234 Put p = new Put(row);
235 p.addColumn(fam, col, data1);
236 tt.put(t1, p);
237 }
238 tm.commit(t1);
239
240 Transaction t2 = tm.begin();
241 LOG.info("Transaction created " + t2);
242 Delete d = new Delete(modrow);
243 tt.delete(t2, d);
244
245 ResultScanner rs = tt.getScanner(t2, new Scan());
246 Result r = rs.next();
247 count = 0;
248 while (r != null) {
249 count++;
250 LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
251 r = rs.next();
252 }
253 assertEquals(count, rowcount - 1, "Wrong count");
254
255 Transaction t3 = tm.begin();
256 LOG.info("Transaction created " + t3);
257 Put p = new Put(modrow);
258 p.addColumn(fam, col, data2);
259 tt.put(t3, p);
260
261 tm.commit(t3);
262
263 boolean aborted = false;
264 try {
265 tm.commit(t2);
266 fail("Didn't abort");
267 } catch (RollbackException e) {
268 aborted = true;
269 }
270 assertTrue(aborted, "Didn't raise exception");
271
272 Transaction tscan = tm.begin();
273 rs = tt.getScanner(tscan, new Scan());
274 r = rs.next();
275 count = 0;
276 while (r != null) {
277 count++;
278 r = rs.next();
279 }
280 assertEquals(count, rowcount, "Wrong count");
281
282 }
283
284 private int countRows(Table table) throws IOException {
285 Scan scan = new Scan();
286 ResultScanner scanner = table.getScanner(scan);
287 Result r = scanner.next();
288 int rowCount = 0;
289 while (r != null) {
290 r = scanner.next();
291 rowCount++;
292 }
293 return rowCount;
294 }
295
296 @Test(timeOut = 60_000)
297 public void testBatchedCleanup(ITestContext context) throws Exception {
298
299 String table2 = "testBatchedCleanupTABLE2";
300 TableName table2Name = TableName.valueOf(table2);
301
302 try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
303 Admin admin = conn.getAdmin()) {
304 TableName htable2 = TableName.valueOf(table2);
305
306 if (!admin.tableExists(htable2)) {
307 HTableDescriptor desc = new HTableDescriptor(table2Name);
308 HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
309 datafam.setMaxVersions(Integer.MAX_VALUE);
310 desc.addFamily(datafam);
311
312 admin.createTable(desc);
313 }
314
315 if (admin.isTableDisabled(htable2)) {
316 admin.enableTable(htable2);
317 }
318 }
319
320 TransactionManager tm = newTransactionManager(context);
321 TTable tt = new TTable(connection, TEST_TABLE);
322 TTable tt2 = new TTable(connection, table2);
323
324 Transaction t1 = tm.begin();
325 LOG.info("Transaction created " + t1);
326
327 Transaction t2 = tm.begin();
328 LOG.info("Transaction created" + t2);
329
330 byte[] row = Bytes.toBytes("test-simple");
331 byte[] fam = Bytes.toBytes(TEST_FAMILY);
332 byte[] col = Bytes.toBytes("testdata");
333 byte[] data1 = Bytes.toBytes("testWrite-1");
334 byte[] data2 = Bytes.toBytes("testWrite-2");
335
336 Put p = new Put(row);
337 p.addColumn(fam, col, data1);
338 tt.put(t1, p);
339
340 Get g = new Get(row).setMaxVersions();
341 g.addColumn(fam, col);
342 Result r = tt.getHTable().get(g);
343 assertEquals(r.size(), 1, "Unexpected size for read.");
344 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
345 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
346
347 int rowcount = HBaseTransaction.MAX_DELETE_BATCH_SIZE*2 + 2;
348
349
350 Put p2 = new Put(row);
351 p2.addColumn(fam, col, data2);
352 tt.put(t2, p2);
353
354
355 for (int i = 0; i < rowcount; i++) {
356 byte[] newRow = Bytes.toBytes("test-del" + i);
357 Put put = new Put(newRow);
358 put.addColumn(fam, col, data2);
359 tt.put(t2, put);
360 tt2.put(t2, put);
361 }
362
363
364 assertEquals(countRows(tt.getHTable()), rowcount + 1, "Unexpected size for read.");
365 assertEquals(countRows(tt2.getHTable()), rowcount, "Unexpected size for read.");
366
367 tm.commit(t1);
368
369 boolean aborted = false;
370 try {
371 tm.commit(t2);
372 fail("Transaction commited successfully");
373 } catch (RollbackException e) {
374 aborted = true;
375 }
376 assertTrue(aborted, "Transaction didn't raise exception");
377
378
379 assertEquals(countRows(tt.getHTable()), 1, "Unexpected size for read.");
380 assertEquals(countRows(tt2.getHTable()), 0, "Unexpected size for read.");
381 }
382
383
384 @Test(timeOut = 10_000)
385 public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
386 TransactionManager tm = newTransactionManager(context);
387 TTable tt = new TTable(connection, TEST_TABLE);
388
389 Transaction t1 = tm.begin();
390 Transaction t2 = tm.begin();
391 LOG.info("Transactions created " + t1 + " " + t2);
392
393 byte[] row = Bytes.toBytes("row");
394 byte[] fam = Bytes.toBytes(TEST_FAMILY);
395 byte[] col1 = Bytes.toBytes("testdata1");
396 byte[] col2 = Bytes.toBytes("testdata2");
397 byte[] data = Bytes.toBytes("testWrite-1");
398
399 Put p2 = new Put(row);
400 p2.addColumn(fam, col1, data);
401 tt.put(t2, p2);
402 tm.commit(t2);
403
404 Put p1 = new Put(row);
405 p1.addColumn(fam, col2, data);
406 tt.put(t1, p1);
407 tm.commit(t1);
408 }
409
410 @Test(timeOut = 10_000)
411 public void runTestWriteWriteConflictWithAdditionalConflictFreeWrites(ITestContext context) throws Exception {
412 TransactionManager tm = newTransactionManager(context);
413 TTable tt1 = new TTable(connection, TEST_TABLE);
414 TTable tt2 = new TTable(connection, TEST_TABLE, true);
415
416 Transaction t1 = tm.begin();
417 LOG.info("Transaction created " + t1);
418
419 Transaction t2 = tm.begin();
420 LOG.info("Transaction created" + t2);
421
422 byte[] row = Bytes.toBytes("test-simple");
423 byte[] fam = Bytes.toBytes(TEST_FAMILY);
424 byte[] col = Bytes.toBytes("testdata");
425 byte[] data1 = Bytes.toBytes("testWrite-1");
426 byte[] data2 = Bytes.toBytes("testWrite-2");
427
428 Put p = new Put(row);
429 p.addColumn(fam, col, data1);
430 tt1.put(t1, p);
431
432 Put p2 = new Put(row);
433 p2.addColumn(fam, col, data2);
434 tt1.put(t2, p2);
435
436 row = Bytes.toBytes("test-simple-cf");
437 p = new Put(row);
438 p.addColumn(fam, col, data1);
439 tt2.put(t1, p);
440
441 p2 = new Put(row);
442 p2.addColumn(fam, col, data2);
443 tt2.put(t2, p2);
444
445 tm.commit(t2);
446
447 try {
448 tm.commit(t1);
449 fail("Transaction should not commit successfully");
450 } catch (RollbackException e) {
451 }
452 }
453
454 @Test(timeOut = 10_000)
455 public void runTestWriteWriteConflictFreeWrites(ITestContext context) throws Exception {
456 TransactionManager tm = newTransactionManager(context);
457 TTable tt1 = new TTable(connection, TEST_TABLE);
458 TTable tt2 = new TTable(connection, TEST_TABLE, true);
459
460 Transaction t1 = tm.begin();
461 LOG.info("Transaction created " + t1);
462
463 Transaction t2 = tm.begin();
464 LOG.info("Transaction created" + t2);
465
466 byte[] row = Bytes.toBytes("test-simple");
467 byte[] fam = Bytes.toBytes(TEST_FAMILY);
468 byte[] col = Bytes.toBytes("testdata");
469 byte[] data1 = Bytes.toBytes("testWrite-1");
470 byte[] data2 = Bytes.toBytes("testWrite-2");
471
472 Put p = new Put(row);
473 p.addColumn(fam, col, data1);
474 tt1.put(t1, p);
475
476 Put p2 = new Put(row);
477 p2.addColumn(fam, col, data2);
478 tt2.put(t2, p2);
479
480 row = Bytes.toBytes("test-simple-cf");
481 p = new Put(row);
482 p.addColumn(fam, col, data1);
483 tt1.put(t1, p);
484
485 p2 = new Put(row);
486 p2.addColumn(fam, col, data2);
487 tt2.put(t2, p2);
488
489 tm.commit(t2);
490
491 try {
492 tm.commit(t1);
493 } catch (RollbackException e) {
494 fail("Transaction should not commit successfully");
495 }
496 }
497
498 @Test(timeOut = 10_000)
499 public void runTestWriteWriteConflictFreeWritesWithOtherWrites(ITestContext context) throws Exception {
500 TransactionManager tm = newTransactionManager(context);
501 TTable tt1 = new TTable(connection, TEST_TABLE);
502 TTable tt2 = new TTable(connection, TEST_TABLE, true);
503
504 Transaction t1 = tm.begin();
505 LOG.info("Transaction created " + t1);
506
507 Transaction t2 = tm.begin();
508 LOG.info("Transaction created" + t2);
509
510 byte[] row = Bytes.toBytes("test-simple");
511 byte[] row1 = Bytes.toBytes("test-simple-1");
512 byte[] fam = Bytes.toBytes(TEST_FAMILY);
513 byte[] col = Bytes.toBytes("testdata");
514 byte[] data1 = Bytes.toBytes("testWrite-1");
515 byte[] data2 = Bytes.toBytes("testWrite-2");
516
517 Put p = new Put(row);
518 p.addColumn(fam, col, data1);
519 tt1.put(t1, p);
520
521 Put p2 = new Put(row1);
522 p2.addColumn(fam, col, data2);
523 tt1.put(t2, p2);
524
525 row = Bytes.toBytes("test-simple-cf");
526 p = new Put(row);
527 p.addColumn(fam, col, data1);
528 tt2.put(t1, p);
529
530 p2 = new Put(row);
531 p2.addColumn(fam, col, data2);
532 tt2.put(t2, p2);
533
534 tm.commit(t2);
535
536 try {
537 tm.commit(t1);
538 } catch (RollbackException e) {
539 fail("Transaction should not commit successfully");
540 }
541 }
542
543 @Test(timeOut = 10_000)
544 public void runTestCleanupConflictFreeWritesAfterConflict(ITestContext context) throws Exception {
545 TransactionManager tm = newTransactionManager(context);
546 TTable tt1 = new TTable(connection, TEST_TABLE);
547 TTable tt2 = new TTable(connection, TEST_TABLE, true);
548
549 Transaction t1 = tm.begin();
550 LOG.info("Transaction created " + t1);
551
552 Transaction t2 = tm.begin();
553 LOG.info("Transaction created" + t2);
554
555 byte[] row = Bytes.toBytes("test-simple");
556 byte[] row1 = Bytes.toBytes("test-simple-1");
557 byte[] fam = Bytes.toBytes(TEST_FAMILY);
558 byte[] col = Bytes.toBytes("testdata");
559 byte[] data1 = Bytes.toBytes("testWrite-1");
560 byte[] data2 = Bytes.toBytes("testWrite-2");
561
562 Put p = new Put(row);
563 p.addColumn(fam, col, data1);
564 tt1.put(t1, p);
565
566 Get g = new Get(row).setMaxVersions();
567 g.addColumn(fam, col);
568 Result r = tt1.getHTable().get(g);
569 assertEquals(r.size(), 1, "Unexpected size for read.");
570 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
571 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
572
573 Put p2 = new Put(row);
574 p2.addColumn(fam, col, data2);
575 tt1.put(t2, p2);
576
577 Put p3 = new Put(row1);
578 p3.addColumn(fam, col, data2);
579 tt2.put(t2, p3);
580
581 r = tt1.getHTable().get(g);
582 assertEquals(r.size(), 2, "Unexpected size for read.");
583 r = tt2.get(t2, g);
584 assertEquals(r.size(),1, "Unexpected size for read.");
585 assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
586 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
587
588 Get g1 = new Get(row1).setMaxVersions();
589 g1.addColumn(fam, col);
590 r = tt1.getHTable().get(g1);
591 assertEquals(r.size(), 1, "Unexpected size for read.");
592
593 tm.commit(t1);
594
595 boolean aborted = false;
596 try {
597 tm.commit(t2);
598 fail("Transaction commited successfully");
599 } catch (RollbackException e) {
600 aborted = true;
601 }
602 assertTrue(aborted, "Transaction didn't raise exception");
603
604 r = tt1.getHTable().get(g);
605 assertEquals(r.size(), 1, "Unexpected size for read.");
606 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
607 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
608 r = tt1.getHTable().get(g1);
609 assertEquals(r.size(), 0, "Unexpected size for read.");
610 }
611 }