View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.omid.transaction.CellUtils.hasCell;
21  import static org.apache.omid.transaction.CellUtils.hasShadowCell;
22  import static org.mockito.Matchers.any;
23  import static org.mockito.Matchers.anyInt;
24  import static org.mockito.Matchers.anyLong;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.doThrow;
27  import static org.mockito.Mockito.never;
28  import static org.mockito.Mockito.spy;
29  import static org.mockito.Mockito.times;
30  import static org.mockito.Mockito.verify;
31  import static org.testng.Assert.assertFalse;
32  import static org.testng.Assert.assertNull;
33  import static org.testng.Assert.assertTrue;
34  
35  import java.util.Arrays;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.concurrent.CountDownLatch;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.client.Delete;
46  import org.apache.hadoop.hbase.client.Get;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.client.ResultScanner;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.client.Table;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.omid.committable.CommitTable;
55  import org.apache.omid.metrics.NullMetricsProvider;
56  import org.mockito.Matchers;
57  import org.mockito.invocation.InvocationOnMock;
58  import org.mockito.stubbing.Answer;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  import org.testng.ITestContext;
62  import org.testng.annotations.Test;
63  
64  import com.google.common.base.Charsets;
65  import com.google.common.base.Optional;
66  import com.google.common.util.concurrent.ListenableFuture;
67  
68  @Test(groups = "sharedHBase")
69  public class TestShadowCells extends OmidTestBase {
70  
71      private static final Logger LOG = LoggerFactory.getLogger(TestShadowCells.class);
72  
73      private static final String TSO_SERVER_HOST = "localhost";
74      private static final int TSO_SERVER_PORT = 1234;
75  
76      private static final String TEST_TABLE = "test";
77      private static final String TEST_FAMILY = "data";
78  
79      static final byte[] row = Bytes.toBytes("test-sc");
80      private static final byte[] row1 = Bytes.toBytes("test-sc1");
81      private static final byte[] row2 = Bytes.toBytes("test-sc2");
82      private static final byte[] row3 = Bytes.toBytes("test-sc3");
83      static final byte[] family = Bytes.toBytes(TEST_FAMILY);
84      private static final byte[] qualifier = Bytes.toBytes("testdata");
85      private static final byte[] data1 = Bytes.toBytes("testWrite-1");
86  
87  
88      @Test(timeOut = 60_000)
89      public void testShadowCellsBasics(ITestContext context) throws Exception {
90  
91          TransactionManager tm = newTransactionManager(context);
92  
93          TTable table = new TTable(connection, TEST_TABLE);
94  
95          HBaseTransaction t1 = (HBaseTransaction) tm.begin();
96  
97          // Test shadow cells are created properly
98          Put put = new Put(row);
99          put.addColumn(family, qualifier, data1);
100         table.put(t1, put);
101 
102         // Before commit test that only the cell is there
103         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
104                 "Cell should be there");
105         assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
106                 "Shadow cell shouldn't be there");
107 
108         tm.commit(t1);
109 
110         // After commit test that both cell and shadow cell are there
111         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
112                 "Cell should be there");
113         assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
114                 "Shadow cell should be there");
115 
116         // Test that we can make a valid read after adding a shadow cell without hitting the commit table
117         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
118 
119         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
120         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
121         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
122         TransactionManager tm2 = HBaseTransactionManager.builder(hbaseOmidClientConf)
123                                                         .commitTableClient(commitTableClient)
124                                                         .build();
125 
126         Transaction t2 = tm2.begin();
127         Get get = new Get(row);
128         get.addColumn(family, qualifier);
129 
130         Result getResult = table.get(t2, get);
131         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
132         verify(commitTableClient, never()).getCommitTimestamp(anyLong());
133     }
134 
135     @Test(timeOut = 60_000)
136     public void testCrashingAfterCommitDoesNotWriteShadowCells(ITestContext context) throws Exception {
137 
138         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
139 
140         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
141         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
142         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
143         PostCommitActions syncPostCommitter = spy(
144                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
145         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
146                 .postCommitter(syncPostCommitter)
147                 .commitTableClient(commitTableClient)
148                 .commitTableWriter(getCommitTable(context).getWriter())
149                 .build());
150 
151         // The following line emulates a crash after commit that is observed in (*) below
152         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
153 
154         TTable table = new TTable(connection, TEST_TABLE);
155 
156         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
157 
158         // Test shadow cell are created properly
159         Put put = new Put(row);
160         put.addColumn(family, qualifier, data1);
161         table.put(t1, put);
162         try {
163             tm.commit(t1);
164         } catch (Exception e) { // (*) crash
165             // Do nothing
166         }
167 
168         // After commit with the emulated crash, test that only the cell is there
169         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
170                 "Cell should be there");
171         assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
172                 "Shadow cell should not be there");
173 
174         Transaction t2 = tm.begin();
175         Get get = new Get(row);
176         get.addColumn(family, qualifier);
177 
178         Result getResult = table.get(t2, get);
179         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Shadow cell should not be there");
180         verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
181     }
182 
183     @Test(timeOut = 60_000)
184     public void testShadowCellIsHealedAfterCommitCrash(ITestContext context) throws Exception {
185 
186         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
187 
188         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
189         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
190         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
191         PostCommitActions syncPostCommitter = spy(
192                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
193         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
194                 .postCommitter(syncPostCommitter)
195                 .commitTableWriter(getCommitTable(context).getWriter())
196                 .commitTableClient(commitTableClient)
197                 .build());
198 
199         // The following line emulates a crash after commit that is observed in (*) below
200         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
201 
202         TTable table = new TTable(connection, TEST_TABLE);
203 
204         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
205 
206         // Test shadow cell are created properly
207         Put put = new Put(row);
208         put.addColumn(family, qualifier, data1);
209         table.put(t1, put);
210         try {
211             tm.commit(t1);
212         } catch (Exception e) { // (*) Crash
213             // Do nothing
214         }
215 
216         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
217                 "Cell should be there");
218         assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
219                 "Shadow cell should not be there");
220 
221         Transaction t2 = tm.begin();
222         Get get = new Get(row);
223         get.addColumn(family, qualifier);
224 
225         // This get should heal the shadow cell
226         Result getResult = table.get(t2, get);
227         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
228         verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
229 
230         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
231                 "Cell should be there");
232         assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
233                 "Shadow cell should be there after being healed");
234 
235         // As the shadow cell is healed, this get shouldn't have to hit the storage,
236         // so the number of invocations to commitTableClient.getCommitTimestamp()
237         // should remain the same
238         getResult = table.get(t2, get);
239         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
240         verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
241     }
242 
243     @Test(timeOut = 60_000)
244     public void testTransactionNeverCompletesWhenAnExceptionIsThrownUpdatingShadowCells(ITestContext context)
245             throws Exception {
246 
247         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
248 
249         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
250         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
251         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
252         PostCommitActions syncPostCommitter = spy(
253                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
254         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
255                 .postCommitter(syncPostCommitter)
256                 .commitTableClient(commitTableClient)
257                 .commitTableWriter(getCommitTable(context).getWriter())
258                 .build());
259 
260         final TTable table = new TTable(connection, TEST_TABLE);
261 
262         HBaseTransaction tx = (HBaseTransaction) tm.begin();
263 
264         Put put = new Put(row);
265         put.addColumn(family, qualifier, data1);
266         table.put(tx, put);
267 
268         // This line emulates an error accessing the target table by disabling it
269         doAnswer(new Answer<ListenableFuture<Void>>() {
270             @Override
271             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
272                 table.flushCommits();
273                 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
274                 admin.disableTable(TableName.valueOf(table.getTableName()));
275                 return (ListenableFuture<Void>) invocation.callRealMethod();
276             }
277         }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
278 
279         // When committing, an IOException should be thrown in syncPostCommitter.updateShadowCells() and placed in the
280         // future as a TransactionManagerException. However, the exception is never retrieved in the
281         // AbstractTransactionManager as the future is never checked.
282         // This requires to set the HConstants.HBASE_CLIENT_RETRIES_NUMBER in the HBase config to a finite number:
283         // e.g -> hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3); Otherwise it will get stuck in tm.commit();
284 
285         tm.commit(tx); // Tx effectively commits but the post Commit Actions failed when updating the shadow cells
286 
287         // Re-enable table to allow the required checks below
288         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
289         admin.enableTable(TableName.valueOf(table.getTableName()));
290 
291         // 1) check that shadow cell is not created...
292         assertTrue(hasCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
293                 "Cell should be there");
294         assertFalse(hasShadowCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
295                 "Shadow cell should not be there");
296         // 2) and thus, completeTransaction() was never called on the commit table...
297         verify(commitTableClient, times(0)).completeTransaction(anyLong());
298         // 3) ...and commit value still in commit table
299         assertTrue(commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get().isPresent());
300 
301     }
302 
303     @Test(timeOut = 60_000)
304     public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
305         final CountDownLatch readAfterCommit = new CountDownLatch(1);
306         final CountDownLatch postCommitBegin = new CountDownLatch(1);
307         final CountDownLatch postCommitEnd = new CountDownLatch(1);
308 
309         final AtomicBoolean readFailed = new AtomicBoolean(false);
310         PostCommitActions syncPostCommitter =
311                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
312         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
313 
314         doAnswer(new Answer<ListenableFuture<Void>>() {
315             @Override
316             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
317                 LOG.info("Releasing readAfterCommit barrier");
318                 readAfterCommit.countDown();
319                 LOG.info("Waiting postCommitBegin barrier");
320                 postCommitBegin.await();
321                 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
322                 LOG.info("Releasing postCommitEnd barrier");
323                 postCommitEnd.countDown();
324                 return result;
325             }
326         }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
327 
328         // Start transaction on write thread
329         final TTable table = new TTable(connection, TEST_TABLE);
330 
331         final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
332 
333         // Start read thread
334         Thread readThread = new Thread("Read Thread") {
335             @Override
336             public void run() {
337                 LOG.info("Waiting readAfterCommit barrier");
338                 try {
339                     readAfterCommit.await();
340                     Table htable = table.getHTable();
341                     Table healer = table.getHTable();
342 
343                     final SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, healer)));
344                     final TTable table = new TTable(htable ,snapshotFilter);
345                     doAnswer(new Answer<List<KeyValue>>() {
346                         @SuppressWarnings("unchecked")
347                         @Override
348                         public List<KeyValue> answer(InvocationOnMock invocation) throws Throwable {
349                             LOG.info("Release postCommitBegin barrier");
350                             postCommitBegin.countDown();
351                             LOG.info("Waiting postCommitEnd barrier");
352                             postCommitEnd.await();
353                             return (List<KeyValue>) invocation.callRealMethod();
354                         }
355                     }).when(snapshotFilter).filterCellsForSnapshot(Matchers.<List<Cell>>any(),
356                             any(HBaseTransaction.class), anyInt(), Matchers.<Map<String, Long>>any(), Matchers.<Map<String,byte[]>>any());
357 
358                     TransactionManager tm = newTransactionManager(context);
359                     if (hasShadowCell(row,
360                             family,
361                             qualifier,
362                             t1.getStartTimestamp(),
363                             new TTableCellGetterAdapter(table))) {
364                         readFailed.set(true);
365                     }
366 
367                     Transaction t = tm.begin();
368                     Get get = new Get(row);
369                     get.addColumn(family, qualifier);
370 
371                     Result getResult = table.get(t, get);
372                     Cell cell = getResult.getColumnLatestCell(family, qualifier);
373                     if (!Arrays.equals(data1, CellUtil.cloneValue(cell))
374                             || !hasShadowCell(row,
375                             family,
376                             qualifier,
377                             cell.getTimestamp(),
378                             new TTableCellGetterAdapter(table))) {
379                         readFailed.set(true);
380                     } else {
381                         LOG.info("Read succeeded");
382                     }
383                 } catch (Throwable e) {
384                     readFailed.set(true);
385                     LOG.error("Error whilst reading", e);
386                 }
387             }
388         };
389         readThread.start();
390 
391         // Write data
392         Put put = new Put(row);
393         put.addColumn(family, qualifier, data1);
394         table.put(t1, put);
395         tm.commit(t1);
396 
397         readThread.join();
398 
399         assertFalse(readFailed.get(), "Read should have succeeded");
400 
401     }
402 
403     // TODO: After removing the legacy shadow cell suffix, maybe we should mix the assertions in this test with
404     // the ones in the previous tests in a further commit
405 
406     /**
407      * Test that the new client can read shadow cells written by the old client.
408      */
409     @Test(timeOut = 60_000)
410     public void testGetOldShadowCells(ITestContext context) throws Exception {
411 
412         TransactionManager tm = newTransactionManager(context);
413 
414         TTable table = new TTable(connection, TEST_TABLE);
415         Table htable = table.getHTable();
416 
417         // Test shadow cell are created properly
418         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
419         Put put = new Put(row1);
420         put.addColumn(family, qualifier, data1);
421         table.put(t1, put);
422         tm.commit(t1);
423 
424         HBaseTransaction t2 = (HBaseTransaction) tm.begin();
425         put = new Put(row2);
426         put.addColumn(family, qualifier, data1);
427         table.put(t2, put);
428         tm.commit(t2);
429 
430         HBaseTransaction t3 = (HBaseTransaction) tm.begin();
431         put = new Put(row3);
432         put.addColumn(family, qualifier, data1);
433         table.put(t3, put);
434         tm.commit(t3);
435 
436         // ensure that transaction is no longer in commit table
437         // the only place that should have the mapping is the shadow cells
438         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
439         Optional<CommitTable.CommitTimestamp> ct1 = commitTableClient.getCommitTimestamp(t1.getStartTimestamp()).get();
440         Optional<CommitTable.CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(t2.getStartTimestamp()).get();
441         Optional<CommitTable.CommitTimestamp> ct3 = commitTableClient.getCommitTimestamp(t3.getStartTimestamp()).get();
442         assertFalse(ct1.isPresent(), "Shouldn't exist in commit table");
443         assertFalse(ct2.isPresent(), "Shouldn't exist in commit table");
444         assertFalse(ct3.isPresent(), "Shouldn't exist in commit table");
445 
446         // delete new shadow cell
447         Delete del = new Delete(row2);
448         del.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
449         htable.delete(del);
450         table.flushCommits();
451 
452         // verify that we can't read now (since shadow cell is missing)
453         Transaction t4 = tm.begin();
454         Get get = new Get(row2);
455         get.addColumn(family, qualifier);
456 
457         Result getResult = table.get(t4, get);
458         assertTrue(getResult.isEmpty(), "Should have nothing");
459 
460         Transaction t5 = tm.begin();
461         Scan s = new Scan();
462         ResultScanner scanner = table.getScanner(t5, s);
463         Result result1 = scanner.next();
464         Result result2 = scanner.next();
465         Result result3 = scanner.next();
466         scanner.close();
467 
468         assertNull(result3);
469         assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
470         assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
471         assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
472         assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
473 
474         // now add in the previous legacy shadow cell for that row
475         put = new Put(row2);
476         put.addColumn(family,
477                 addLegacyShadowCellSuffix(qualifier),
478                 t2.getStartTimestamp(),
479                 Bytes.toBytes(t2.getCommitTimestamp()));
480         htable.put(put);
481 
482         // we should NOT be able to read that row now, even though
483         // it has a legacy shadow cell
484         Transaction t6 = tm.begin();
485         get = new Get(row2);
486         get.addColumn(family, qualifier);
487 
488         getResult = table.get(t6, get);
489         assertFalse(getResult.containsColumn(family, qualifier), "Should NOT have column");
490 
491         Transaction t7 = tm.begin();
492         s = new Scan();
493         scanner = table.getScanner(t7, s);
494         result1 = scanner.next();
495         result2 = scanner.next();
496         result3 = scanner.next();
497         scanner.close();
498 
499         assertNull(result3, "There should only be 2 rows");
500         assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
501         assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
502         assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
503         assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
504     }
505 
506     // ----------------------------------------------------------------------------------------------------------------
507     // Helper methods
508     // ----------------------------------------------------------------------------------------------------------------
509 
510     private static final byte[] LEGACY_SHADOW_CELL_SUFFIX = ":OMID_CTS".getBytes(Charsets.UTF_8);
511 
512     private static byte[] addLegacyShadowCellSuffix(byte[] qualifier) {
513         return com.google.common.primitives.Bytes.concat(qualifier, LEGACY_SHADOW_CELL_SUFFIX);
514     }
515 
516 }