View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.omid.transaction.CellUtils.hasCell;
21  import static org.apache.omid.transaction.CellUtils.hasShadowCell;
22  import static org.junit.Assert.assertEquals;
23  import static org.mockito.Matchers.any;
24  import static org.mockito.Mockito.doAnswer;
25  import static org.mockito.Mockito.spy;
26  import static org.testng.Assert.assertTrue;
27  
28  import java.io.IOException;
29  import java.util.List;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
34  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Delete;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.Table;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.omid.committable.CommitTable;
45  import org.apache.omid.metrics.NullMetricsProvider;
46  import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
47  import org.junit.Assert;
48  import org.mockito.invocation.InvocationOnMock;
49  import org.mockito.stubbing.Answer;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  import org.testng.ITestContext;
53  import org.testng.annotations.Test;
54  
55  @Test(groups = "sharedHBase")
56  public class TestCheckpoint extends OmidTestBase {
57  
58      private static final Logger LOG = LoggerFactory.getLogger(TestCheckpoint.class);
59  
60      private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
61          if (tx instanceof HBaseTransaction) {
62              return (HBaseTransaction) tx;
63          } else {
64              throw new IllegalArgumentException(
65                  String.format("The transaction object passed %s is not an instance of HBaseTransaction",
66                                tx.getClass().getName()));
67          }
68      }
69  
70      @Test(timeOut = 30_000)
71      public void testFewCheckPoints(ITestContext context) throws Exception {
72  
73          TransactionManager tm = newTransactionManager(context);
74          TTable tt = new TTable(connection, TEST_TABLE);
75  
76          byte[] rowName1 = Bytes.toBytes("row1");
77          byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
78          byte[] colName1 = Bytes.toBytes("col1");
79          byte[] dataValue1 = Bytes.toBytes("testWrite-1");
80          byte[] dataValue2 = Bytes.toBytes("testWrite-2");
81          byte[] dataValue3 = Bytes.toBytes("testWrite-3");
82  
83          Transaction tx1 = tm.begin();
84  
85          HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
86  
87          Put row1 = new Put(rowName1);
88          row1.addColumn(famName1, colName1, dataValue1);
89          tt.put(tx1, row1);
90  
91          Get g = new Get(rowName1).setMaxVersions(1);
92  
93          Result r = tt.get(tx1, g);
94          assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
95                  "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
96  
97          hbaseTx1.checkpoint();
98  
99          row1 = new Put(rowName1);
100         row1.addColumn(famName1, colName1, dataValue2);
101         tt.put(tx1, row1);
102 
103         r = tt.get(tx1, g);
104         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
105                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
106 
107         hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT);
108 
109         r = tt.get(tx1, g);
110         assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
111                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
112 
113         hbaseTx1.checkpoint();
114 
115         row1 = new Put(rowName1);
116         row1.addColumn(famName1, colName1, dataValue3);
117         tt.put(tx1, row1);
118 
119         r = tt.get(tx1, g);
120         assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
121                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
122 
123         hbaseTx1.checkpoint();
124 
125         r = tt.get(tx1, g);
126         assertTrue(Bytes.equals(dataValue3, r.getValue(famName1, colName1)),
127                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
128 
129         hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL);
130 
131         r = tt.get(tx1, g);
132         
133         assertTrue(r.size() == 3, "Expected 3 results and found " + r.size());
134 
135         List<Cell> cells = r.getColumnCells(famName1, colName1);
136         assertTrue(Bytes.equals(dataValue3, CellUtil.cloneValue(cells.get(0))),
137                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
138 
139         assertTrue(Bytes.equals(dataValue2, CellUtil.cloneValue(cells.get(1))),
140               "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
141 
142         assertTrue(Bytes.equals(dataValue1, CellUtil.cloneValue(cells.get(2))),
143                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
144 
145         tt.close();
146     }
147 
148     @Test(timeOut = 30_000)
149     public void testSNAPSHOT(ITestContext context) throws Exception {
150         TransactionManager tm = newTransactionManager(context);
151         TTable tt = new TTable(connection, TEST_TABLE);
152 
153         byte[] rowName1 = Bytes.toBytes("row1");
154         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
155         byte[] colName1 = Bytes.toBytes("col1");
156         byte[] dataValue0 = Bytes.toBytes("testWrite-0");
157         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
158         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
159 
160         Transaction tx1 = tm.begin();
161 
162         Put row1 = new Put(rowName1);
163         row1.addColumn(famName1, colName1, dataValue0);
164         tt.put(tx1, row1);
165 
166         tm.commit(tx1);
167 
168         tx1 = tm.begin();
169 
170         HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
171 
172         Get g = new Get(rowName1).setMaxVersions(1);
173 
174         Result r = tt.get(tx1, g);
175         assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)),
176                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
177 
178         row1 = new Put(rowName1);
179         row1.addColumn(famName1, colName1, dataValue1);
180         tt.put(tx1, row1);
181 
182 
183         r = tt.get(tx1, g);
184         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
185                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
186 
187         hbaseTx1.checkpoint();
188 
189         row1 = new Put(rowName1);
190         row1.addColumn(famName1, colName1, dataValue2);
191         tt.put(tx1, row1);
192 
193         r = tt.get(tx1, g);
194         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
195                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
196 
197         hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT);
198 
199         r = tt.get(tx1, g);
200         assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
201                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
202 
203         tt.close();
204     }
205     
206     @Test(timeOut = 30_000)
207     public void testSNAPSHOT_ALL(ITestContext context) throws Exception {
208         TransactionManager tm = newTransactionManager(context);
209         TTable tt = new TTable(connection, TEST_TABLE);
210 
211         byte[] rowName1 = Bytes.toBytes("row1");
212         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
213         byte[] colName1 = Bytes.toBytes("col1");
214         byte[] dataValue0 = Bytes.toBytes("testWrite-0");
215         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
216         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
217 
218         Transaction tx1 = tm.begin();
219 
220         Put row1 = new Put(rowName1);
221         row1.addColumn(famName1, colName1, dataValue0);
222         tt.put(tx1, row1);
223 
224         tm.commit(tx1);
225 
226         tx1 = tm.begin();
227         
228         HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
229 
230         Get g = new Get(rowName1).setMaxVersions(100);
231 
232         Result r = tt.get(tx1, g);
233         assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)),
234                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
235 
236         row1 = new Put(rowName1);
237         row1.addColumn(famName1, colName1, dataValue1);
238         tt.put(tx1, row1);
239 
240         g = new Get(rowName1).setMaxVersions(100);
241 
242         r = tt.get(tx1, g);
243         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
244                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
245 
246         hbaseTx1.checkpoint();
247 
248         row1 = new Put(rowName1);
249         row1.addColumn(famName1, colName1, dataValue2);
250         tt.put(tx1, row1);
251 
252         r = tt.get(tx1, g);
253         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
254                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
255 
256         hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL);
257 
258         r = tt.get(tx1, g);
259         
260         assertTrue(r.size() == 3, "Expected 3 results and found " + r.size());
261 
262         List<Cell> cells = r.getColumnCells(famName1, colName1);
263         assertTrue(Bytes.equals(dataValue2, CellUtil.cloneValue(cells.get(0))),
264                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
265 
266         assertTrue(Bytes.equals(dataValue1, CellUtil.cloneValue(cells.get(1))),
267               "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
268 
269         assertTrue(Bytes.equals(dataValue0, CellUtil.cloneValue(cells.get(2))),
270                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
271 
272         tt.close();
273     }
274 
275     @Test(timeOut = 30_000)
276     public void testSNAPSHOT_EXCLUDE_CURRENT(ITestContext context) throws Exception {
277         TransactionManager tm = newTransactionManager(context);
278         TTable tt = new TTable(connection, TEST_TABLE);
279 
280         byte[] rowName1 = Bytes.toBytes("row1");
281         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
282         byte[] colName1 = Bytes.toBytes("col1");
283         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
284         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
285 
286         Transaction tx1 = tm.begin();
287 
288         HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
289 
290         Put row1 = new Put(rowName1);
291         row1.addColumn(famName1, colName1, dataValue1);
292         tt.put(tx1, row1);
293 
294         Get g = new Get(rowName1).setMaxVersions(1);
295 
296         Result r = tt.get(tx1, g);
297         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
298                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
299 
300         hbaseTx1.checkpoint();
301 
302         row1 = new Put(rowName1);
303         row1.addColumn(famName1, colName1, dataValue2);
304         tt.put(tx1, row1);
305 
306         r = tt.get(tx1, g);
307         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
308                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
309 
310         hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
311 
312         r = tt.get(tx1, g);
313         assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
314                 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
315         
316         tt.close();
317     }
318 
319     @Test(timeOut = 30_000)
320     public void testDeleteAfterCheckpoint(ITestContext context) throws Exception {
321         TransactionManager tm = newTransactionManager(context);
322         TTable tt = new TTable(connection, TEST_TABLE);
323 
324         byte[] rowName1 = Bytes.toBytes("row1");
325         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
326         byte[] colName1 = Bytes.toBytes("col1");
327         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
328 
329         Transaction tx1 = tm.begin();
330 
331         Put row1 = new Put(rowName1);
332         row1.addColumn(famName1, colName1, dataValue1);
333         tt.put(tx1, row1);
334 
335         tm.commit(tx1);
336 
337         Transaction tx2 = tm.begin();
338 
339         HBaseTransaction hbaseTx2 = enforceHBaseTransactionAsParam(tx1);
340 
341         hbaseTx2.checkpoint();
342 
343         Delete d = new Delete(rowName1);
344         tt.delete(tx2, d);
345 
346         try {
347             tm.commit(tx2);
348         } catch (TransactionException e) {
349             Assert.fail();
350         }
351 
352         tt.close();
353     }
354 
355     @Test(timeOut = 30_000)
356     public void testOutOfCheckpoints(ITestContext context) throws Exception {
357         TransactionManager tm = newTransactionManager(context);
358 
359         Transaction tx1 = tm.begin();
360 
361         HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
362 
363         for (int i = 0; i < CommitTable.MAX_CHECKPOINTS_PER_TXN - 1; ++i) {
364             hbaseTx1.checkpoint();
365         }
366 
367         try {
368             hbaseTx1.checkpoint();
369             Assert.fail();
370         } catch (TransactionException e) {
371             // expected
372         }
373 
374     }
375 
376 
377     @Test(timeOut = 60_000)
378     public void testInMemoryCommitTableCheckpoints(ITestContext context) throws Exception {
379 
380         final byte[] row = Bytes.toBytes("test-sc");
381         final byte[] family = Bytes.toBytes(TEST_FAMILY);
382         final byte[] qualifier = Bytes.toBytes("testdata");
383         final byte[] qualifier2 = Bytes.toBytes("testdata2");
384         final byte[] data1 = Bytes.toBytes("testWrite-");
385 
386         final CountDownLatch beforeCTRemove = new CountDownLatch(1);
387         final CountDownLatch afterCommit = new CountDownLatch(1);
388         final CountDownLatch writerDone = new CountDownLatch(1);
389 
390         final AtomicLong startTimestamp = new AtomicLong(0);
391         final AtomicLong commitTimestamp = new AtomicLong(0);
392         PostCommitActions syncPostCommitter =
393                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
394         final AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
395 
396         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
397         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
398                 tm.getCommitTableClient());
399         final TTable table = new TTable(htable,snapshotFilter);
400 
401 
402         doAnswer(new Answer<ListenableFuture<Void>>() {
403             @Override
404             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
405                 afterCommit.countDown();
406                 beforeCTRemove.await();
407                 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
408                 return result;
409             }
410         }).when(syncPostCommitter).removeCommitTableEntry(any(HBaseTransaction.class));
411 
412 
413         Thread writeThread = new Thread("WriteThread"){
414             @Override
415             public void run() {
416                 try {
417 
418                     HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
419                     Put put = new Put(row);
420                     put.addColumn(family, qualifier, data1);
421 
422                     startTimestamp.set(tx1.getStartTimestamp());
423                     table.put(tx1, put);
424                     tx1.checkpoint();
425 
426                     Put put2 = new Put(row);
427                     put2.addColumn(family, qualifier2, data1);
428                     table.put(tx1, put2);
429 
430                     tm.commit(tx1);
431 
432                     commitTimestamp.set(tx1.getCommitTimestamp());
433                     writerDone.countDown();
434                 } catch (IOException | RollbackException e) {
435                     e.printStackTrace();
436                 }
437             }
438         };
439 
440         writeThread.start();
441 
442         afterCommit.await();
443 
444         Optional<CommitTable.CommitTimestamp> ct1 = tm.getCommitTableClient().getCommitTimestamp(startTimestamp.get()).get();
445         Optional<CommitTable.CommitTimestamp> ct2 = tm.getCommitTableClient().getCommitTimestamp(startTimestamp.get() + 1).get();
446 
447         beforeCTRemove.countDown();
448 
449         writerDone.await();
450 
451         assertEquals(commitTimestamp.get(), ct1.get().getValue());
452         assertEquals(commitTimestamp.get(), ct2.get().getValue());
453 
454 
455         assertTrue(hasCell(row, family, qualifier, startTimestamp.get(), new TTableCellGetterAdapter(table)),
456                 "Cell should be there");
457         assertTrue(hasCell(row, family, qualifier2, startTimestamp.get()+1, new TTableCellGetterAdapter(table)),
458                 "Cell should be there");
459         assertTrue(hasShadowCell(row, family, qualifier, startTimestamp.get(), new TTableCellGetterAdapter(table)),
460                 "Cell should be there");
461         assertTrue(hasShadowCell(row, family, qualifier2, startTimestamp.get()+1, new TTableCellGetterAdapter(table)),
462                 "Cell should be there");
463     }
464 }