View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Charsets;
21  import com.google.common.base.Optional;
22  import com.google.common.util.concurrent.ListenableFuture;
23  import org.apache.omid.committable.CommitTable;
24  
25  import org.apache.omid.metrics.NullMetricsProvider;
26  
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.client.Delete;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HBaseAdmin;
33  import org.apache.hadoop.hbase.client.HTableInterface;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.mockito.Matchers;
40  import org.mockito.invocation.InvocationOnMock;
41  import org.mockito.stubbing.Answer;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  import org.testng.ITestContext;
45  import org.testng.annotations.Test;
46  
47  import java.util.Arrays;
48  import java.util.List;
49  import java.util.concurrent.CountDownLatch;
50  import java.util.concurrent.atomic.AtomicBoolean;
51  
52  import static org.apache.omid.transaction.CellUtils.hasCell;
53  import static org.apache.omid.transaction.CellUtils.hasShadowCell;
54  import static org.mockito.Matchers.any;
55  import static org.mockito.Matchers.anyInt;
56  import static org.mockito.Matchers.anyLong;
57  import static org.mockito.Mockito.doAnswer;
58  import static org.mockito.Mockito.doThrow;
59  import static org.mockito.Mockito.never;
60  import static org.mockito.Mockito.spy;
61  import static org.mockito.Mockito.times;
62  import static org.mockito.Mockito.verify;
63  import static org.testng.Assert.assertFalse;
64  import static org.testng.Assert.assertNull;
65  import static org.testng.Assert.assertTrue;
66  
67  @Test(groups = "sharedHBase")
68  public class TestShadowCells extends OmidTestBase {
69  
70      private static final Logger LOG = LoggerFactory.getLogger(TestShadowCells.class);
71  
72      private static final String TSO_SERVER_HOST = "localhost";
73      private static final int TSO_SERVER_PORT = 1234;
74  
75      private static final String TEST_TABLE = "test";
76      private static final String TEST_FAMILY = "data";
77  
78      static final byte[] row = Bytes.toBytes("test-sc");
79      private static final byte[] row1 = Bytes.toBytes("test-sc1");
80      private static final byte[] row2 = Bytes.toBytes("test-sc2");
81      private static final byte[] row3 = Bytes.toBytes("test-sc3");
82      static final byte[] family = Bytes.toBytes(TEST_FAMILY);
83      private static final byte[] qualifier = Bytes.toBytes("testdata");
84      private static final byte[] data1 = Bytes.toBytes("testWrite-1");
85  
86  
87      @Test(timeOut = 60_000)
88      public void testShadowCellsBasics(ITestContext context) throws Exception {
89  
90          TransactionManager tm = newTransactionManager(context);
91  
92          TTable table = new TTable(hbaseConf, TEST_TABLE);
93  
94          HBaseTransaction t1 = (HBaseTransaction) tm.begin();
95  
96          // Test shadow cells are created properly
97          Put put = new Put(row);
98          put.add(family, qualifier, data1);
99          table.put(t1, put);
100 
101         // Before commit test that only the cell is there
102         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
103                 "Cell should be there");
104         assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
105                 "Shadow cell shouldn't be there");
106 
107         tm.commit(t1);
108 
109         // After commit test that both cell and shadow cell are there
110         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
111                 "Cell should be there");
112         assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
113                 "Shadow cell should be there");
114 
115         // Test that we can make a valid read after adding a shadow cell without hitting the commit table
116         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
117 
118         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
119         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
120         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
121         TransactionManager tm2 = HBaseTransactionManager.builder(hbaseOmidClientConf)
122                                                         .commitTableClient(commitTableClient)
123                                                         .build();
124 
125         Transaction t2 = tm2.begin();
126         Get get = new Get(row);
127         get.addColumn(family, qualifier);
128 
129         Result getResult = table.get(t2, get);
130         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
131         verify(commitTableClient, never()).getCommitTimestamp(anyLong());
132     }
133 
134     @Test(timeOut = 60_000)
135     public void testCrashingAfterCommitDoesNotWriteShadowCells(ITestContext context) throws Exception {
136 
137         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
138 
139         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
140         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
141         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
142         PostCommitActions syncPostCommitter = spy(
143                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
144         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
145                 .postCommitter(syncPostCommitter)
146                 .commitTableClient(commitTableClient)
147                 .build());
148 
149         // The following line emulates a crash after commit that is observed in (*) below
150         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
151 
152         TTable table = new TTable(hbaseConf, TEST_TABLE);
153 
154         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
155 
156         // Test shadow cell are created properly
157         Put put = new Put(row);
158         put.add(family, qualifier, data1);
159         table.put(t1, put);
160         try {
161             tm.commit(t1);
162         } catch (Exception e) { // (*) crash
163             // Do nothing
164         }
165 
166         // After commit with the emulated crash, test that only the cell is there
167         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
168                 "Cell should be there");
169         assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
170                 "Shadow cell should not be there");
171 
172         Transaction t2 = tm.begin();
173         Get get = new Get(row);
174         get.addColumn(family, qualifier);
175 
176         Result getResult = table.get(t2, get);
177         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Shadow cell should not be there");
178         verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
179     }
180 
181     @Test(timeOut = 60_000)
182     public void testShadowCellIsHealedAfterCommitCrash(ITestContext context) throws Exception {
183 
184         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
185 
186         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
187         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
188         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
189         PostCommitActions syncPostCommitter = spy(
190                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
191         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
192                 .postCommitter(syncPostCommitter)
193                 .commitTableClient(commitTableClient)
194                 .build());
195 
196         // The following line emulates a crash after commit that is observed in (*) below
197         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
198 
199         TTable table = new TTable(hbaseConf, TEST_TABLE);
200 
201         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
202 
203         // Test shadow cell are created properly
204         Put put = new Put(row);
205         put.add(family, qualifier, data1);
206         table.put(t1, put);
207         try {
208             tm.commit(t1);
209         } catch (Exception e) { // (*) Crash
210             // Do nothing
211         }
212 
213         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
214                 "Cell should be there");
215         assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
216                 "Shadow cell should not be there");
217 
218         Transaction t2 = tm.begin();
219         Get get = new Get(row);
220         get.addColumn(family, qualifier);
221 
222         // This get should heal the shadow cell
223         Result getResult = table.get(t2, get);
224         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
225         verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
226 
227         assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
228                 "Cell should be there");
229         assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
230                 "Shadow cell should be there after being healed");
231 
232         // As the shadow cell is healed, this get shouldn't have to hit the storage,
233         // so the number of invocations to commitTableClient.getCommitTimestamp()
234         // should remain the same
235         getResult = table.get(t2, get);
236         assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
237         verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
238     }
239 
240     @Test(timeOut = 60_000)
241     public void testTransactionNeverCompletesWhenAnExceptionIsThrownUpdatingShadowCells(ITestContext context)
242             throws Exception {
243 
244         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
245 
246         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
247         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
248         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
249         PostCommitActions syncPostCommitter = spy(
250                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
251         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
252                 .postCommitter(syncPostCommitter)
253                 .commitTableClient(commitTableClient)
254                 .build());
255 
256         final TTable table = new TTable(hbaseConf, TEST_TABLE);
257 
258         HBaseTransaction tx = (HBaseTransaction) tm.begin();
259 
260         Put put = new Put(row);
261         put.add(family, qualifier, data1);
262         table.put(tx, put);
263 
264         // This line emulates an error accessing the target table by disabling it
265         doAnswer(new Answer<ListenableFuture<Void>>() {
266             @Override
267             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
268                 table.flushCommits();
269                 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
270                 admin.disableTable(table.getTableName());
271                 return (ListenableFuture<Void>) invocation.callRealMethod();
272             }
273         }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
274 
275         // When committing, an IOException should be thrown in syncPostCommitter.updateShadowCells() and placed in the
276         // future as a TransactionManagerException. However, the exception is never retrieved in the
277         // AbstractTransactionManager as the future is never checked.
278         // This requires to set the HConstants.HBASE_CLIENT_RETRIES_NUMBER in the HBase config to a finite number:
279         // e.g -> hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3); Otherwise it will get stuck in tm.commit();
280 
281         tm.commit(tx); // Tx effectively commits but the post Commit Actions failed when updating the shadow cells
282 
283         // Re-enable table to allow the required checks below
284         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
285         admin.enableTable(table.getTableName());
286 
287         // 1) check that shadow cell is not created...
288         assertTrue(hasCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
289                 "Cell should be there");
290         assertFalse(hasShadowCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
291                 "Shadow cell should not be there");
292         // 2) and thus, completeTransaction() was never called on the commit table...
293         verify(commitTableClient, times(0)).completeTransaction(anyLong());
294         // 3) ...and commit value still in commit table
295         assertTrue(commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get().isPresent());
296 
297     }
298 
299     @Test(timeOut = 60_000)
300     public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
301         final CountDownLatch readAfterCommit = new CountDownLatch(1);
302         final CountDownLatch postCommitBegin = new CountDownLatch(1);
303         final CountDownLatch postCommitEnd = new CountDownLatch(1);
304 
305         final AtomicBoolean readFailed = new AtomicBoolean(false);
306         PostCommitActions syncPostCommitter =
307                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
308         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
309 
310         doAnswer(new Answer<ListenableFuture<Void>>() {
311             @Override
312             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
313                 LOG.info("Releasing readAfterCommit barrier");
314                 readAfterCommit.countDown();
315                 LOG.info("Waiting postCommitBegin barrier");
316                 postCommitBegin.await();
317                 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
318                 LOG.info("Releasing postCommitEnd barrier");
319                 postCommitEnd.countDown();
320                 return result;
321             }
322         }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
323 
324         // Start transaction on write thread
325         TTable table = new TTable(hbaseConf, TEST_TABLE);
326 
327         final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
328 
329         // Start read thread
330         Thread readThread = new Thread("Read Thread") {
331             @Override
332             public void run() {
333                 LOG.info("Waiting readAfterCommit barrier");
334                 try {
335                     readAfterCommit.await();
336                     final TTable table = spy(new TTable(hbaseConf, TEST_TABLE));
337                     doAnswer(new Answer<List<KeyValue>>() {
338                         @SuppressWarnings("unchecked")
339                         @Override
340                         public List<KeyValue> answer(InvocationOnMock invocation) throws Throwable {
341                             LOG.info("Release postCommitBegin barrier");
342                             postCommitBegin.countDown();
343                             LOG.info("Waiting postCommitEnd barrier");
344                             postCommitEnd.await();
345                             return (List<KeyValue>) invocation.callRealMethod();
346                         }
347                     }).when(table).filterCellsForSnapshot(Matchers.<List<Cell>>any(),
348                             any(HBaseTransaction.class), anyInt());
349 
350                     TransactionManager tm = newTransactionManager(context);
351                     if (hasShadowCell(row,
352                             family,
353                             qualifier,
354                             t1.getStartTimestamp(),
355                             new TTableCellGetterAdapter(table))) {
356                         readFailed.set(true);
357                     }
358 
359                     Transaction t = tm.begin();
360                     Get get = new Get(row);
361                     get.addColumn(family, qualifier);
362 
363                     Result getResult = table.get(t, get);
364                     Cell cell = getResult.getColumnLatestCell(family, qualifier);
365                     if (!Arrays.equals(data1, CellUtil.cloneValue(cell))
366                             || !hasShadowCell(row,
367                             family,
368                             qualifier,
369                             cell.getTimestamp(),
370                             new TTableCellGetterAdapter(table))) {
371                         readFailed.set(true);
372                     } else {
373                         LOG.info("Read succeeded");
374                     }
375                 } catch (Throwable e) {
376                     readFailed.set(true);
377                     LOG.error("Error whilst reading", e);
378                 }
379             }
380         };
381         readThread.start();
382 
383         // Write data
384         Put put = new Put(row);
385         put.add(family, qualifier, data1);
386         table.put(t1, put);
387         tm.commit(t1);
388 
389         readThread.join();
390 
391         assertFalse(readFailed.get(), "Read should have succeeded");
392 
393     }
394 
395     // TODO: After removing the legacy shadow cell suffix, maybe we should mix the assertions in this test with
396     // the ones in the previous tests in a further commit
397 
398     /**
399      * Test that the new client can read shadow cells written by the old client.
400      */
401     @Test
402     public void testGetOldShadowCells(ITestContext context) throws Exception {
403 
404         TransactionManager tm = newTransactionManager(context);
405 
406         TTable table = new TTable(hbaseConf, TEST_TABLE);
407         HTableInterface htable = table.getHTable();
408 
409         // Test shadow cell are created properly
410         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
411         Put put = new Put(row1);
412         put.add(family, qualifier, data1);
413         table.put(t1, put);
414         tm.commit(t1);
415 
416         HBaseTransaction t2 = (HBaseTransaction) tm.begin();
417         put = new Put(row2);
418         put.add(family, qualifier, data1);
419         table.put(t2, put);
420         tm.commit(t2);
421 
422         HBaseTransaction t3 = (HBaseTransaction) tm.begin();
423         put = new Put(row3);
424         put.add(family, qualifier, data1);
425         table.put(t3, put);
426         tm.commit(t3);
427 
428         // ensure that transaction is no longer in commit table
429         // the only place that should have the mapping is the shadow cells
430         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
431         Optional<CommitTable.CommitTimestamp> ct1 = commitTableClient.getCommitTimestamp(t1.getStartTimestamp()).get();
432         Optional<CommitTable.CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(t2.getStartTimestamp()).get();
433         Optional<CommitTable.CommitTimestamp> ct3 = commitTableClient.getCommitTimestamp(t3.getStartTimestamp()).get();
434         assertFalse(ct1.isPresent(), "Shouldn't exist in commit table");
435         assertFalse(ct2.isPresent(), "Shouldn't exist in commit table");
436         assertFalse(ct3.isPresent(), "Shouldn't exist in commit table");
437 
438         // delete new shadow cell
439         Delete del = new Delete(row2);
440         del.deleteColumn(family, CellUtils.addShadowCellSuffix(qualifier));
441         htable.delete(del);
442         htable.flushCommits();
443 
444         // verify that we can't read now (since shadow cell is missing)
445         Transaction t4 = tm.begin();
446         Get get = new Get(row2);
447         get.addColumn(family, qualifier);
448 
449         Result getResult = table.get(t4, get);
450         assertTrue(getResult.isEmpty(), "Should have nothing");
451 
452         Transaction t5 = tm.begin();
453         Scan s = new Scan();
454         ResultScanner scanner = table.getScanner(t5, s);
455         Result result1 = scanner.next();
456         Result result2 = scanner.next();
457         Result result3 = scanner.next();
458         scanner.close();
459 
460         assertNull(result3);
461         assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
462         assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
463         assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
464         assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
465 
466         // now add in the previous legacy shadow cell for that row
467         put = new Put(row2);
468         put.add(family,
469                 addLegacyShadowCellSuffix(qualifier),
470                 t2.getStartTimestamp(),
471                 Bytes.toBytes(t2.getCommitTimestamp()));
472         htable.put(put);
473 
474         // we should NOT be able to read that row now, even though
475         // it has a legacy shadow cell
476         Transaction t6 = tm.begin();
477         get = new Get(row2);
478         get.addColumn(family, qualifier);
479 
480         getResult = table.get(t6, get);
481         assertFalse(getResult.containsColumn(family, qualifier), "Should NOT have column");
482 
483         Transaction t7 = tm.begin();
484         s = new Scan();
485         scanner = table.getScanner(t7, s);
486         result1 = scanner.next();
487         result2 = scanner.next();
488         result3 = scanner.next();
489         scanner.close();
490 
491         assertNull(result3, "There should only be 2 rows");
492         assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
493         assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
494         assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
495         assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
496     }
497 
498     // ----------------------------------------------------------------------------------------------------------------
499     // Helper methods
500     // ----------------------------------------------------------------------------------------------------------------
501 
502     private static final byte[] LEGACY_SHADOW_CELL_SUFFIX = ":OMID_CTS".getBytes(Charsets.UTF_8);
503 
504     private static byte[] addLegacyShadowCellSuffix(byte[] qualifier) {
505         return com.google.common.primitives.Bytes.concat(qualifier, LEGACY_SHADOW_CELL_SUFFIX);
506     }
507 
508 }