View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.hadoop.hbase.client.Get;
21  import org.apache.hadoop.hbase.client.Put;
22  import org.apache.hadoop.hbase.client.Result;
23  import org.apache.hadoop.hbase.client.ResultScanner;
24  import org.apache.hadoop.hbase.client.Scan;
25  import org.apache.hadoop.hbase.util.Bytes;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  import org.testng.ITestContext;
29  import org.testng.annotations.Test;
30  
31  import static org.junit.Assert.fail;
32  import static org.testng.Assert.assertEquals;
33  import static org.testng.Assert.assertTrue;
34  
35  @Test(groups = "sharedHBase")
36  public class TestBasicTransaction extends OmidTestBase {
37  
38      private static final Logger LOG = LoggerFactory.getLogger(TestBasicTransaction.class);
39  
40  
41      @Test(timeOut = 30_000)
42      public void testTimestampsOfTwoRowsInstertedAfterCommitOfSingleTransactionAreEquals(ITestContext context) throws Exception {
43  
44          TransactionManager tm = newTransactionManager(context);
45          TTable tt = new TTable(hbaseConf, TEST_TABLE);
46  
47          byte[] rowName1 = Bytes.toBytes("row1");
48          byte[] rowName2 = Bytes.toBytes("row2");
49          byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
50          byte[] colName1 = Bytes.toBytes("col1");
51          byte[] dataValue1 = Bytes.toBytes("testWrite-1");
52          byte[] dataValue2 = Bytes.toBytes("testWrite-2");
53  
54          Transaction tx1 = tm.begin();
55  
56          Put row1 = new Put(rowName1);
57          row1.add(famName1, colName1, dataValue1);
58          tt.put(tx1, row1);
59          Put row2 = new Put(rowName2);
60          row2.add(famName1, colName1, dataValue2);
61          tt.put(tx1, row2);
62  
63          tm.commit(tx1);
64  
65          tt.close();
66  
67          // Checks
68          Get getResultRow1 = new Get(rowName1).setMaxVersions(1);
69          Result result1 = tt.getHTable().get(getResultRow1);
70          byte[] val1 = result1.getValue(famName1, colName1);
71          assertTrue(Bytes.equals(dataValue1, result1.getValue(famName1, colName1)),
72                  "Unexpected value for row 1 in col 1: " + Bytes.toString(val1));
73          long tsRow1 = result1.rawCells()[0].getTimestamp();
74  
75          Get getResultRow2 = new Get(rowName2).setMaxVersions(1);
76          Result result2 = tt.getHTable().get(getResultRow2);
77          byte[] val2 = result2.getValue(famName1, colName1);
78          assertTrue(Bytes.equals(dataValue2, result2.getValue(famName1, colName1)),
79                  "Unexpected value for row 2 in col 1: " + Bytes.toString(val2));
80          long tsRow2 = result2.rawCells()[0].getTimestamp();
81  
82          assertEquals(tsRow2, tsRow1, "Timestamps of row 1 and row 2 are different");
83  
84      }
85  
86      @Test(timeOut = 30_000)
87      public void testTimestampsOfTwoRowsModifiedByTwoSequentialTransactionsAreEqualAndHaveBeenIncreasedMonotonically(ITestContext context)
88              throws Exception {
89  
90          TransactionManager tm = newTransactionManager(context);
91          TTable tt = new TTable(hbaseConf, TEST_TABLE);
92  
93          byte[] rowName1 = Bytes.toBytes("row1");
94          byte[] rowName2 = Bytes.toBytes("row2");
95          byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
96          byte[] colName1 = Bytes.toBytes("col1");
97          byte[] dataValue1 = Bytes.toBytes("testWrite-1");
98          byte[] dataValue2 = Bytes.toBytes("testWrite-2");
99  
100         byte[] dataValue3 = Bytes.toBytes("testWrite-3");
101         byte[] dataValue4 = Bytes.toBytes("testWrite-4");
102 
103         Transaction tx1 = tm.begin();
104 
105         Put row1 = new Put(rowName1);
106         row1.add(famName1, colName1, dataValue1);
107         tt.put(tx1, row1);
108         Put row2 = new Put(rowName2);
109         row2.add(famName1, colName1, dataValue2);
110         tt.put(tx1, row2);
111 
112         tm.commit(tx1);
113 
114         Transaction tx2 = tm.begin();
115 
116         row1 = new Put(rowName1);
117         row1.add(famName1, colName1, dataValue3);
118         tt.put(tx2, row1);
119         row2 = new Put(rowName2);
120         row2.add(famName1, colName1, dataValue4);
121         tt.put(tx2, row2);
122 
123         tm.commit(tx2);
124 
125         tt.close();
126 
127         // Checks
128         Get getResultRow1 = new Get(rowName1).setMaxVersions(2);
129         Result result1 = tt.getHTable().get(getResultRow1);
130         byte[] val1 = result1.getValue(famName1, colName1);
131         assertTrue(Bytes.equals(dataValue3, result1.getValue(famName1, colName1)),
132                 "Unexpected value for row 1 in col 1: " + Bytes.toString(val1));
133 
134         long lastTsRow1 = result1.rawCells()[0].getTimestamp();
135         long previousTsRow1 = result1.rawCells()[1].getTimestamp();
136 
137         Get getResultRow2 = new Get(rowName2).setMaxVersions(2);
138         Result result2 = tt.getHTable().get(getResultRow2);
139         byte[] val2 = result2.getValue(famName1, colName1);
140         assertTrue(Bytes.equals(dataValue4, result2.getValue(famName1, colName1)),
141                 "Unexpected value for row 2 in col 1: " + Bytes.toString(val2));
142 
143         long lastTsRow2 = result2.rawCells()[0].getTimestamp();
144         long previousTsRow2 = result2.rawCells()[1].getTimestamp();
145 
146         assertTrue(lastTsRow1 == lastTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different");
147         assertTrue(previousTsRow1 == previousTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different");
148         assertTrue(lastTsRow1 > previousTsRow1, "Timestamp assigned by Tx2 to row 1 hasn't increased monotonically");
149         assertTrue(lastTsRow2 > previousTsRow2, "Timestamp assigned by Tx2 to row 2 hasn't increased monotonically");
150 
151     }
152 
153     @Test(timeOut = 30_000)
154     public void runTestSimple(ITestContext context) throws Exception {
155 
156         TransactionManager tm = newTransactionManager(context);
157 
158         TTable tt = new TTable(hbaseConf, TEST_TABLE);
159 
160         Transaction t1 = tm.begin();
161         LOG.info("Transaction created " + t1);
162 
163         byte[] row = Bytes.toBytes("test-simple");
164         byte[] fam = Bytes.toBytes(TEST_FAMILY);
165         byte[] col = Bytes.toBytes("testdata");
166         byte[] data1 = Bytes.toBytes("testWrite-1");
167         byte[] data2 = Bytes.toBytes("testWrite-2");
168 
169         Put p = new Put(row);
170         p.add(fam, col, data1);
171         tt.put(t1, p);
172         tm.commit(t1);
173 
174         Transaction tread = tm.begin();
175         Transaction t2 = tm.begin();
176         p = new Put(row);
177         p.add(fam, col, data2);
178         tt.put(t2, p);
179         tm.commit(t2);
180 
181         Get g = new Get(row).setMaxVersions(1);
182         Result r = tt.getHTable().get(g);
183         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
184                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
185 
186         r = tt.get(tread, g);
187         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
188                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
189     }
190 
191     @Test(timeOut = 30_000)
192     public void runTestManyVersions(ITestContext context) throws Exception {
193 
194         TransactionManager tm = newTransactionManager(context);
195         TTable tt = new TTable(hbaseConf, TEST_TABLE);
196 
197         Transaction t1 = tm.begin();
198         LOG.info("Transaction created " + t1);
199 
200         byte[] row = Bytes.toBytes("test-simple");
201         byte[] fam = Bytes.toBytes(TEST_FAMILY);
202         byte[] col = Bytes.toBytes("testdata");
203         byte[] data1 = Bytes.toBytes("testWrite-1");
204         byte[] data2 = Bytes.toBytes("testWrite-2");
205 
206         Put p = new Put(row);
207         p.add(fam, col, data1);
208         tt.put(t1, p);
209         tm.commit(t1);
210 
211         for (int i = 0; i < 5; ++i) {
212             Transaction t2 = tm.begin();
213             p = new Put(row);
214             p.add(fam, col, data2);
215             tt.put(t2, p);
216         }
217         Transaction tread = tm.begin();
218 
219         Get g = new Get(row).setMaxVersions(1);
220         Result r = tt.getHTable().get(g);
221         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
222                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
223 
224         r = tt.get(tread, g);
225         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
226                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
227 
228     }
229 
230     @Test(timeOut = 30_000)
231     public void runTestInterleave(ITestContext context) throws Exception {
232 
233         TransactionManager tm = newTransactionManager(context);
234         TTable tt = new TTable(hbaseConf, TEST_TABLE);
235 
236         Transaction t1 = tm.begin();
237         LOG.info("Transaction created " + t1);
238 
239         byte[] row = Bytes.toBytes("test-interleave");
240         byte[] fam = Bytes.toBytes(TEST_FAMILY);
241         byte[] col = Bytes.toBytes("testdata");
242         byte[] data1 = Bytes.toBytes("testWrite-1");
243         byte[] data2 = Bytes.toBytes("testWrite-2");
244 
245         Put p = new Put(row);
246         p.add(fam, col, data1);
247         tt.put(t1, p);
248         tm.commit(t1);
249 
250         Transaction t2 = tm.begin();
251         p = new Put(row);
252         p.add(fam, col, data2);
253         tt.put(t2, p);
254 
255         Transaction tread = tm.begin();
256         Get g = new Get(row).setMaxVersions(1);
257         Result r = tt.get(tread, g);
258         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
259                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
260         tm.commit(t2);
261 
262         r = tt.getHTable().get(g);
263         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
264                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
265 
266     }
267 
268     @Test(expectedExceptions = IllegalArgumentException.class, timeOut = 30_000)
269     public void testSameCommitRaisesException(ITestContext context) throws Exception {
270         TransactionManager tm = newTransactionManager(context);
271 
272         Transaction t1 = tm.begin();
273         tm.commit(t1);
274         tm.commit(t1);
275     }
276 
277     @Test(timeOut = 30_000)
278     public void testInterleavedScanReturnsTheRightSnapshotResults(ITestContext context) throws Exception {
279 
280         TransactionManager tm = newTransactionManager(context);
281         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
282 
283         // Basic data-scaffolding for test
284         byte[] fam = Bytes.toBytes(TEST_FAMILY);
285         byte[] col = Bytes.toBytes("TEST_COL");
286         byte[] data1 = Bytes.toBytes("testWrite-1");
287         byte[] data2 = Bytes.toBytes("testWrite-2");
288 
289         byte[] startRow = Bytes.toBytes("row-to-scan" + 0);
290         byte[] stopRow = Bytes.toBytes("row-to-scan" + 9);
291         byte[] randomRow = Bytes.toBytes("row-to-scan" + 3);
292 
293         // Add some data transactionally to have an initial state for the test
294         Transaction tx1 = tm.begin();
295         for (int i = 0; i < 10; i++) {
296             byte[] row = Bytes.toBytes("row-to-scan" + i);
297 
298             Put p = new Put(row);
299             p.add(fam, col, data1);
300             txTable.put(tx1, p);
301         }
302         tm.commit(tx1);
303 
304         // Start a second transaction -Tx2- modifying a random row and check that a concurrent transactional context
305         // that scans the table, gets the proper snapshot with the stuff written by Tx1
306         Transaction tx2 = tm.begin();
307         Put p = new Put(randomRow);
308         p.add(fam, col, data2);
309         txTable.put(tx2, p);
310 
311         Transaction scanTx = tm.begin(); // This is the concurrent transactional scanner
312         ResultScanner rs = txTable.getScanner(scanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
313         Result r = rs.next(); // Exercise the next() method
314         int i = 0;
315         while (r != null) {
316             LOG.trace("Scan (" + ++i + ")" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col)));
317             assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
318                     "Unexpected value for SI scan " + scanTx + ": " + Bytes.toString(r.getValue(fam, col)));
319             r = rs.next();
320         }
321 
322         // Commit the Tx2 and then check that under a new transactional context, the scanner gets the right snapshot,
323         // which must include the row modified by Tx2
324         tm.commit(tx2);
325 
326         int modifiedRows = 0;
327         Transaction newScanTx = tm.begin();
328         ResultScanner newRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
329         Result[] results = newRS.next(10); // Exercise the next(numRows) method
330         for (Result result : results) {
331             if (Bytes.equals(data2, result.getValue(fam, col))) {
332                 LOG.trace("Modified :" + Bytes.toString(result.getRow()));
333                 modifiedRows++;
334             }
335         }
336         assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
337 
338         // Same check as before but checking that the results are correct when retrieved through the Scanner Iterator
339         modifiedRows = 0;
340         ResultScanner iterableRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
341         for (Result res : iterableRS) {
342             if (Bytes.equals(data2, res.getValue(fam, col))) {
343                 LOG.trace("Modified :" + Bytes.toString(res.getRow()));
344                 modifiedRows++;
345             }
346         }
347 
348         assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
349 
350         // Finally, check that the Scanner Iterator does not implement the remove method
351         try {
352             iterableRS.iterator().remove();
353             fail();
354         } catch (RuntimeException re) {
355             // Expected
356         }
357 
358     }
359 
360     @Test(timeOut = 30_000)
361     public void testInterleavedScanReturnsTheRightSnapshotResultsWhenATransactionAborts(ITestContext context)
362             throws Exception {
363 
364         TransactionManager tm = newTransactionManager(context);
365         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
366 
367         // Basic data-scaffolding for test
368         byte[] fam = Bytes.toBytes(TEST_FAMILY);
369         byte[] col = Bytes.toBytes("TEST_COL");
370         byte[] data1 = Bytes.toBytes("testWrite-1");
371         byte[] data2 = Bytes.toBytes("testWrite-2");
372 
373         byte[] startRow = Bytes.toBytes("row-to-scan" + 0);
374         byte[] stopRow = Bytes.toBytes("row-to-scan" + 9);
375         byte[] randomRow = Bytes.toBytes("row-to-scan" + 3);
376 
377         // Add some data transactionally to have an initial state for the test
378         Transaction tx1 = tm.begin();
379         for (int i = 0; i < 10; i++) {
380             byte[] row = Bytes.toBytes("row-to-scan" + i);
381 
382             Put p = new Put(row);
383             p.add(fam, col, data1);
384             txTable.put(tx1, p);
385         }
386         tm.commit(tx1);
387 
388         // Start a second transaction modifying a random row and check that a transactional scanner in Tx2 gets the
389         // right snapshot with the new value in the random row just written by Tx2
390         Transaction tx2 = tm.begin();
391         Put p = new Put(randomRow);
392         p.add(fam, col, data2);
393         txTable.put(tx2, p);
394 
395         int modifiedRows = 0;
396         ResultScanner rs = txTable.getScanner(tx2, new Scan().setStartRow(startRow).setStopRow(stopRow));
397         Result r = rs.next();
398         while (r != null) {
399             if (Bytes.equals(data2, r.getValue(fam, col))) {
400                 LOG.trace("Modified :" + Bytes.toString(r.getRow()));
401                 modifiedRows++;
402             }
403 
404             r = rs.next();
405         }
406 
407         assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
408 
409         // Rollback the second transaction and then check that under a new transactional scanner we get the snapshot
410         // that includes the only the initial rows put by Tx1
411         tm.rollback(tx2);
412 
413         Transaction txScan = tm.begin();
414         rs = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow));
415         r = rs.next();
416         while (r != null) {
417             LOG.trace("Scan1 :" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col)));
418             assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
419                     "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(r.getValue(fam, col)));
420             r = rs.next();
421         }
422 
423         // Same check as before but checking that the results are correct when retrieved through the Scanner Iterator
424         ResultScanner iterableRS = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow));
425         for (Result result : iterableRS) {
426             assertTrue(Bytes.equals(data1, result.getValue(fam, col)),
427                     "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(result.getValue(fam, col)));
428         }
429 
430         // Finally, check that the Scanner Iterator does not implement the remove method
431         try {
432             iterableRS.iterator().remove();
433             fail();
434         } catch (RuntimeException re) {
435             // Expected
436         }
437 
438     }
439 
440 }