View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.testng.Assert.assertEquals;
21  import static org.testng.Assert.assertTrue;
22  import static org.testng.Assert.fail;
23  
24  import org.apache.hadoop.hbase.HColumnDescriptor;
25  import org.apache.hadoop.hbase.HTableDescriptor;
26  import org.apache.hadoop.hbase.TableName;
27  import org.apache.hadoop.hbase.client.Admin;
28  import org.apache.hadoop.hbase.client.Connection;
29  import org.apache.hadoop.hbase.client.ConnectionFactory;
30  import org.apache.hadoop.hbase.client.Delete;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.ResultScanner;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  import org.testng.ITestContext;
40  import org.testng.annotations.Test;
41  
42  @Test(groups = "sharedHBase")
43  public class TestTransactionConflict extends OmidTestBase {
44  
45      private static final Logger LOG = LoggerFactory.getLogger(TestTransactionConflict.class);
46  
47      @Test(timeOut = 10_000)
48      public void runTestWriteWriteConflict(ITestContext context) throws Exception {
49          TransactionManager tm = newTransactionManager(context);
50          TTable tt = new TTable(connection, TEST_TABLE);
51  
52          Transaction t1 = tm.begin();
53          LOG.info("Transaction created " + t1);
54  
55          Transaction t2 = tm.begin();
56          LOG.info("Transaction created" + t2);
57  
58          byte[] row = Bytes.toBytes("test-simple");
59          byte[] fam = Bytes.toBytes(TEST_FAMILY);
60          byte[] col = Bytes.toBytes("testdata");
61          byte[] data1 = Bytes.toBytes("testWrite-1");
62          byte[] data2 = Bytes.toBytes("testWrite-2");
63  
64          Put p = new Put(row);
65          p.addColumn(fam, col, data1);
66          tt.put(t1, p);
67  
68          Put p2 = new Put(row);
69          p2.addColumn(fam, col, data2);
70          tt.put(t2, p2);
71  
72          tm.commit(t2);
73  
74          try {
75              tm.commit(t1);
76              fail("Transaction should not commit successfully");
77          } catch (RollbackException e) {
78          }
79      }
80  
81      @Test(timeOut = 10_000)
82      public void runTestMultiTableConflict(ITestContext context) throws Exception {
83          TransactionManager tm = newTransactionManager(context);
84          TTable tt = new TTable(connection, TEST_TABLE);
85          String table2 = TEST_TABLE + 2;
86          TableName table2Name = TableName.valueOf(table2);
87  
88          try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
89               Admin admin = conn.getAdmin()) {
90              TableName htable2 = TableName.valueOf(table2);
91  
92              if (!admin.tableExists(htable2)) {
93                  HTableDescriptor desc = new HTableDescriptor(table2Name);
94                  HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
95                  datafam.setMaxVersions(Integer.MAX_VALUE);
96                  desc.addFamily(datafam);
97      
98                  admin.createTable(desc);
99              }
100     
101             if (admin.isTableDisabled(htable2)) {
102                 admin.enableTable(htable2);
103             }
104         }
105 
106         TTable tt2 = new TTable(connection, table2);
107 
108         Transaction t1 = tm.begin();
109         LOG.info("Transaction created " + t1);
110 
111         Transaction t2 = tm.begin();
112         LOG.info("Transaction created" + t2);
113 
114         byte[] row = Bytes.toBytes("test-simple");
115         byte[] row2 = Bytes.toBytes("test-simple2");
116         byte[] fam = Bytes.toBytes(TEST_FAMILY);
117         byte[] col = Bytes.toBytes("testdata");
118         byte[] data1 = Bytes.toBytes("testWrite-1");
119         byte[] data2 = Bytes.toBytes("testWrite-2");
120 
121         Put p = new Put(row);
122         p.addColumn(fam, col, data1);
123         tt.put(t1, p);
124         tt2.put(t1, p);
125 
126         Put p2 = new Put(row);
127         p2.addColumn(fam, col, data2);
128         tt.put(t2, p2);
129         p2 = new Put(row2);
130         p2.addColumn(fam, col, data2);
131         tt2.put(t2, p2);
132 
133         tm.commit(t2);
134 
135         boolean aborted = false;
136         try {
137             tm.commit(t1);
138             fail("Transaction commited successfully");
139         } catch (RollbackException e) {
140             aborted = true;
141         }
142         assertTrue(aborted, "Transaction didn't raise exception");
143 
144         ResultScanner rs = tt2.getHTable().getScanner(fam, col);
145 
146         int count = 0;
147         Result r;
148         while ((r = rs.next()) != null) {
149             count += r.size();
150         }
151         assertEquals(count, 1, "Should have cell");
152     }
153 
154     @Test(timeOut = 10_000)
155     public void runTestCleanupAfterConflict(ITestContext context) throws Exception {
156         TransactionManager tm = newTransactionManager(context);
157         TTable tt = new TTable(connection, TEST_TABLE);
158 
159         Transaction t1 = tm.begin();
160         LOG.info("Transaction created " + t1);
161 
162         Transaction t2 = tm.begin();
163         LOG.info("Transaction created" + t2);
164 
165         byte[] row = Bytes.toBytes("test-simple");
166         byte[] fam = Bytes.toBytes(TEST_FAMILY);
167         byte[] col = Bytes.toBytes("testdata");
168         byte[] data1 = Bytes.toBytes("testWrite-1");
169         byte[] data2 = Bytes.toBytes("testWrite-2");
170 
171         Put p = new Put(row);
172         p.addColumn(fam, col, data1);
173         tt.put(t1, p);
174 
175         Get g = new Get(row).setMaxVersions();
176         g.addColumn(fam, col);
177         Result r = tt.getHTable().get(g);
178         assertEquals(r.size(), 1, "Unexpected size for read.");
179         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
180                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
181 
182         Put p2 = new Put(row);
183         p2.addColumn(fam, col, data2);
184         tt.put(t2, p2);
185 
186         r = tt.getHTable().get(g);
187         assertEquals(r.size(), 2, "Unexpected size for read.");
188         r = tt.get(t2, g);
189         assertEquals(r.size(),1, "Unexpected size for read.");
190         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
191                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
192 
193         tm.commit(t1);
194 
195         boolean aborted = false;
196         try {
197             tm.commit(t2);
198             fail("Transaction commited successfully");
199         } catch (RollbackException e) {
200             aborted = true;
201         }
202         assertTrue(aborted, "Transaction didn't raise exception");
203 
204         r = tt.getHTable().get(g);
205         assertEquals(r.size(), 1, "Unexpected size for read.");
206         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
207                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
208     }
209 
210     @Test(timeOut = 10_000)
211     public void testCleanupWithDeleteRow(ITestContext context) throws Exception {
212 
213         TransactionManager tm = newTransactionManager(context);
214         TTable tt = new TTable(connection, TEST_TABLE);
215 
216         Transaction t1 = tm.begin();
217         LOG.info("Transaction created " + t1);
218 
219         int rowcount = 10;
220         int count = 0;
221 
222         byte[] fam = Bytes.toBytes(TEST_FAMILY);
223         byte[] col = Bytes.toBytes("testdata");
224         byte[] data1 = Bytes.toBytes("testWrite-1");
225         byte[] data2 = Bytes.toBytes("testWrite-2");
226 
227         byte[] modrow = Bytes.toBytes("test-del" + 3);
228         for (int i = 0; i < rowcount; i++) {
229             byte[] row = Bytes.toBytes("test-del" + i);
230 
231             Put p = new Put(row);
232             p.addColumn(fam, col, data1);
233             tt.put(t1, p);
234         }
235         tm.commit(t1);
236 
237         Transaction t2 = tm.begin();
238         LOG.info("Transaction created " + t2);
239         Delete d = new Delete(modrow);
240         tt.delete(t2, d);
241 
242         ResultScanner rs = tt.getScanner(t2, new Scan());
243         Result r = rs.next();
244         count = 0;
245         while (r != null) {
246             count++;
247             LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
248             r = rs.next();
249         }
250         assertEquals(count, rowcount - 1, "Wrong count");
251 
252         Transaction t3 = tm.begin();
253         LOG.info("Transaction created " + t3);
254         Put p = new Put(modrow);
255         p.addColumn(fam, col, data2);
256         tt.put(t3, p);
257 
258         tm.commit(t3);
259 
260         boolean aborted = false;
261         try {
262             tm.commit(t2);
263             fail("Didn't abort");
264         } catch (RollbackException e) {
265             aborted = true;
266         }
267         assertTrue(aborted, "Didn't raise exception");
268 
269         Transaction tscan = tm.begin();
270         rs = tt.getScanner(tscan, new Scan());
271         r = rs.next();
272         count = 0;
273         while (r != null) {
274             count++;
275             r = rs.next();
276         }
277         assertEquals(count, rowcount, "Wrong count");
278 
279     }
280 
281     @Test(timeOut = 10_000)
282     public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
283         TransactionManager tm = newTransactionManager(context);
284         TTable tt = new TTable(connection, TEST_TABLE);
285 
286         Transaction t1 = tm.begin();
287         Transaction t2 = tm.begin();
288         LOG.info("Transactions created " + t1 + " " + t2);
289 
290         byte[] row = Bytes.toBytes("row");
291         byte[] fam = Bytes.toBytes(TEST_FAMILY);
292         byte[] col1 = Bytes.toBytes("testdata1");
293         byte[] col2 = Bytes.toBytes("testdata2");
294         byte[] data = Bytes.toBytes("testWrite-1");
295 
296         Put p2 = new Put(row);
297         p2.addColumn(fam, col1, data);
298         tt.put(t2, p2);
299         tm.commit(t2);
300 
301         Put p1 = new Put(row);
302         p1.addColumn(fam, col2, data);
303         tt.put(t1, p1);
304         tm.commit(t1);
305     }
306 
307     @Test(timeOut = 10_000)
308     public void runTestWriteWriteConflictWithAdditionalConflictFreeWrites(ITestContext context) throws Exception {
309         TransactionManager tm = newTransactionManager(context);
310         TTable tt1 = new TTable(connection, TEST_TABLE);
311         TTable tt2 = new TTable(connection, TEST_TABLE, true);
312 
313         Transaction t1 = tm.begin();
314         LOG.info("Transaction created " + t1);
315 
316         Transaction t2 = tm.begin();
317         LOG.info("Transaction created" + t2);
318 
319         byte[] row = Bytes.toBytes("test-simple");
320         byte[] fam = Bytes.toBytes(TEST_FAMILY);
321         byte[] col = Bytes.toBytes("testdata");
322         byte[] data1 = Bytes.toBytes("testWrite-1");
323         byte[] data2 = Bytes.toBytes("testWrite-2");
324 
325         Put p = new Put(row);
326         p.addColumn(fam, col, data1);
327         tt1.put(t1, p);
328 
329         Put p2 = new Put(row);
330         p2.addColumn(fam, col, data2);
331         tt1.put(t2, p2);
332 
333         row = Bytes.toBytes("test-simple-cf");
334         p = new Put(row);
335         p.addColumn(fam, col, data1);
336         tt2.put(t1, p);
337 
338         p2 = new Put(row);
339         p2.addColumn(fam, col, data2);
340         tt2.put(t2, p2);
341 
342         tm.commit(t2);
343 
344         try {
345             tm.commit(t1);
346             fail("Transaction should not commit successfully");
347         } catch (RollbackException e) {
348         }
349     }
350 
351     @Test(timeOut = 10_000)
352     public void runTestWriteWriteConflictFreeWrites(ITestContext context) throws Exception {
353         TransactionManager tm = newTransactionManager(context);
354         TTable tt1 = new TTable(connection, TEST_TABLE);
355         TTable tt2 = new TTable(connection, TEST_TABLE, true);
356 
357         Transaction t1 = tm.begin();
358         LOG.info("Transaction created " + t1);
359 
360         Transaction t2 = tm.begin();
361         LOG.info("Transaction created" + t2);
362 
363         byte[] row = Bytes.toBytes("test-simple");
364         byte[] fam = Bytes.toBytes(TEST_FAMILY);
365         byte[] col = Bytes.toBytes("testdata");
366         byte[] data1 = Bytes.toBytes("testWrite-1");
367         byte[] data2 = Bytes.toBytes("testWrite-2");
368 
369         Put p = new Put(row);
370         p.addColumn(fam, col, data1);
371         tt1.put(t1, p);
372 
373         Put p2 = new Put(row);
374         p2.addColumn(fam, col, data2);
375         tt2.put(t2, p2);
376 
377         row = Bytes.toBytes("test-simple-cf");
378         p = new Put(row);
379         p.addColumn(fam, col, data1);
380         tt1.put(t1, p);
381 
382         p2 = new Put(row);
383         p2.addColumn(fam, col, data2);
384         tt2.put(t2, p2);
385 
386         tm.commit(t2);
387 
388         try {
389             tm.commit(t1);
390         } catch (RollbackException e) {
391             fail("Transaction should not commit successfully");
392         }
393     }
394 
395     @Test(timeOut = 10_000)
396     public void runTestWriteWriteConflictFreeWritesWithOtherWrites(ITestContext context) throws Exception {
397         TransactionManager tm = newTransactionManager(context);
398         TTable tt1 = new TTable(connection, TEST_TABLE);
399         TTable tt2 = new TTable(connection, TEST_TABLE, true);
400 
401         Transaction t1 = tm.begin();
402         LOG.info("Transaction created " + t1);
403 
404         Transaction t2 = tm.begin();
405         LOG.info("Transaction created" + t2);
406 
407         byte[] row = Bytes.toBytes("test-simple");
408         byte[] row1 = Bytes.toBytes("test-simple-1");
409         byte[] fam = Bytes.toBytes(TEST_FAMILY);
410         byte[] col = Bytes.toBytes("testdata");
411         byte[] data1 = Bytes.toBytes("testWrite-1");
412         byte[] data2 = Bytes.toBytes("testWrite-2");
413 
414         Put p = new Put(row);
415         p.addColumn(fam, col, data1);
416         tt1.put(t1, p);
417 
418         Put p2 = new Put(row1);
419         p2.addColumn(fam, col, data2);
420         tt1.put(t2, p2);
421 
422         row = Bytes.toBytes("test-simple-cf");
423         p = new Put(row);
424         p.addColumn(fam, col, data1);
425         tt2.put(t1, p);
426 
427         p2 = new Put(row);
428         p2.addColumn(fam, col, data2);
429         tt2.put(t2, p2);
430 
431         tm.commit(t2);
432 
433         try {
434             tm.commit(t1);
435         } catch (RollbackException e) {
436             fail("Transaction should not commit successfully");
437         }
438     }
439 
440     @Test(timeOut = 10_000)
441     public void runTestCleanupConflictFreeWritesAfterConflict(ITestContext context) throws Exception {
442         TransactionManager tm = newTransactionManager(context);
443         TTable tt1 = new TTable(connection, TEST_TABLE);
444         TTable tt2 = new TTable(connection, TEST_TABLE, true);
445 
446         Transaction t1 = tm.begin();
447         LOG.info("Transaction created " + t1);
448 
449         Transaction t2 = tm.begin();
450         LOG.info("Transaction created" + t2);
451 
452         byte[] row = Bytes.toBytes("test-simple");
453         byte[] row1 = Bytes.toBytes("test-simple-1");
454         byte[] fam = Bytes.toBytes(TEST_FAMILY);
455         byte[] col = Bytes.toBytes("testdata");
456         byte[] data1 = Bytes.toBytes("testWrite-1");
457         byte[] data2 = Bytes.toBytes("testWrite-2");
458 
459         Put p = new Put(row);
460         p.addColumn(fam, col, data1);
461         tt1.put(t1, p);
462 
463         Get g = new Get(row).setMaxVersions();
464         g.addColumn(fam, col);
465         Result r = tt1.getHTable().get(g);
466         assertEquals(r.size(), 1, "Unexpected size for read.");
467         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
468                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
469 
470         Put p2 = new Put(row);
471         p2.addColumn(fam, col, data2);
472         tt1.put(t2, p2);
473 
474         Put p3 = new Put(row1);
475         p3.addColumn(fam, col, data2);
476         tt2.put(t2, p3);
477 
478         r = tt1.getHTable().get(g);
479         assertEquals(r.size(), 2, "Unexpected size for read.");
480         r = tt2.get(t2, g);
481         assertEquals(r.size(),1, "Unexpected size for read.");
482         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
483                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
484 
485         Get g1 = new Get(row1).setMaxVersions();
486         g1.addColumn(fam, col);
487         r = tt1.getHTable().get(g1);
488         assertEquals(r.size(), 1, "Unexpected size for read.");
489 
490         tm.commit(t1);
491 
492         boolean aborted = false;
493         try {
494             tm.commit(t2);
495             fail("Transaction commited successfully");
496         } catch (RollbackException e) {
497             aborted = true;
498         }
499         assertTrue(aborted, "Transaction didn't raise exception");
500 
501         r = tt1.getHTable().get(g);
502         assertEquals(r.size(), 1, "Unexpected size for read.");
503         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
504                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
505         r = tt1.getHTable().get(g1);
506         assertEquals(r.size(), 0, "Unexpected size for read.");
507     }
508 }