View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Optional;
21  import com.google.common.util.concurrent.ListenableFuture;
22  import com.google.common.util.concurrent.ListeningExecutorService;
23  import com.google.common.util.concurrent.MoreExecutors;
24  import com.google.common.util.concurrent.SettableFuture;
25  import com.google.common.util.concurrent.ThreadFactoryBuilder;
26  import org.apache.omid.committable.CommitTable;
27  import org.apache.omid.metrics.NullMetricsProvider;
28  import org.apache.hadoop.hbase.client.Get;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.mockito.invocation.InvocationOnMock;
33  import org.mockito.stubbing.Answer;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.testng.ITestContext;
37  import org.testng.annotations.Test;
38  
39  import java.util.concurrent.CountDownLatch;
40  import java.util.concurrent.Executors;
41  
42  import static org.mockito.Matchers.any;
43  import static org.mockito.Mockito.doAnswer;
44  import static org.mockito.Mockito.spy;
45  import static org.mockito.Mockito.times;
46  import static org.mockito.Mockito.verify;
47  import static org.testng.Assert.assertEquals;
48  import static org.testng.Assert.assertFalse;
49  import static org.testng.Assert.assertNotNull;
50  import static org.testng.Assert.assertTrue;
51  
52  @Test(groups = "sharedHBase")
53  public class TestAsynchronousPostCommitter extends OmidTestBase {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(TestAsynchronousPostCommitter.class);
56  
57      private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
58      private static final byte[] nonExistentFamily = Bytes.toBytes("non-existent");
59      private static final byte[] qualifier = Bytes.toBytes("test-qual");
60  
61      byte[] row1 = Bytes.toBytes("test-is-committed1");
62      byte[] row2 = Bytes.toBytes("test-is-committed2");
63  
64      @Test(timeOut = 30_000)
65      public void testPostCommitActionsAreCalledAsynchronously(ITestContext context) throws Exception {
66  
67          CommitTable.Client commitTableClient = getCommitTable(context).getClient();
68  
69          PostCommitActions syncPostCommitter =
70                  spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
71          ListeningExecutorService postCommitExecutor =
72                  MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
73                          new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
74          PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
75  
76          TransactionManager tm = newTransactionManager(context, asyncPostCommitter);
77  
78          final CountDownLatch beforeUpdatingShadowCellsLatch = new CountDownLatch(1);
79          final CountDownLatch afterUpdatingShadowCellsLatch = new CountDownLatch(1);
80          final CountDownLatch beforeRemovingCTEntryLatch = new CountDownLatch(1);
81          final CountDownLatch afterRemovingCTEntryLatch = new CountDownLatch(1);
82  
83          doAnswer(new Answer<ListenableFuture<Void>>() {
84              public ListenableFuture<Void> answer(InvocationOnMock invocation) {
85                  try {
86                      beforeUpdatingShadowCellsLatch.await();
87                      invocation.callRealMethod();
88                      afterUpdatingShadowCellsLatch.countDown();
89                  } catch (Throwable throwable) {
90                      throwable.printStackTrace();
91                  }
92                  return SettableFuture.create();
93              }
94          }).when(syncPostCommitter).updateShadowCells(any(AbstractTransaction.class));
95  
96          doAnswer(new Answer<ListenableFuture<Void>>() {
97              public ListenableFuture<Void> answer(InvocationOnMock invocation) {
98                  try {
99                      beforeRemovingCTEntryLatch.await();
100                     LOG.info("We are here");
101                     invocation.callRealMethod();
102                     afterRemovingCTEntryLatch.countDown();
103                 } catch (Throwable throwable) {
104                     throwable.printStackTrace();
105                 }
106                 return SettableFuture.create();
107             }
108         }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
109 
110         try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
111 
112             // Execute tx with async post commit actions
113             Transaction tx1 = tm.begin();
114 
115             Put put1 = new Put(row1);
116             put1.add(family, qualifier, Bytes.toBytes("hey!"));
117             txTable.put(tx1, put1);
118             Put put2 = new Put(row2);
119             put2.add(family, qualifier, Bytes.toBytes("hou!"));
120             txTable.put(tx1, put2);
121 
122             tm.commit(tx1);
123 
124             long tx1Id = tx1.getTransactionId();
125 
126             // As we have paused the update of shadow cells, the shadow cells shouldn't be there yet
127             assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
128             assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
129 
130             // Commit Table should contain an entry for the transaction
131             Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
132             assertTrue(commitTimestamp.isPresent());
133             assertTrue(commitTimestamp.get().isValid());
134             assertEquals(commitTimestamp.get().getValue(), ((AbstractTransaction) tx1).getCommitTimestamp());
135 
136             // Read from row1 and row2 in a different Tx and check that result is the data written by tx1 despite the
137             // post commit actions have not been executed yet (the shadow cells healing process should make its work)
138             Transaction tx2 = tm.begin();
139             Get get1 = new Get(row1);
140             Result result = txTable.get(tx2, get1);
141             byte[] value =  result.getValue(family, qualifier);
142             assertNotNull(value);
143             assertEquals("hey!", Bytes.toString(value));
144 
145             Get get2 = new Get(row2);
146             result = txTable.get(tx2, get2);
147             value = result.getValue(family, qualifier);
148             assertNotNull(value);
149             assertEquals("hou!", Bytes.toString(value));
150 
151             // Then, we continue with the update of shadow cells and we wait till completed
152             beforeUpdatingShadowCellsLatch.countDown();
153             afterUpdatingShadowCellsLatch.await();
154 
155             // Now we can check that the shadow cells are there...
156             verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
157             assertTrue(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
158             assertTrue(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
159             // ...and the transaction entry is still in the Commit Table
160             commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
161             assertTrue(commitTimestamp.isPresent());
162             assertTrue(commitTimestamp.get().isValid());
163             assertEquals(commitTimestamp.get().getValue(), ((AbstractTransaction) tx1).getCommitTimestamp());
164 
165             // Finally, we continue till the Commit Table cleaning process is done...
166             beforeRemovingCTEntryLatch.countDown();
167             afterRemovingCTEntryLatch.await();
168 
169             // ...so now, the Commit Table should NOT contain the entry for the transaction anymore
170             verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
171             commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
172             assertFalse(commitTimestamp.isPresent());
173 
174             // Final checks
175             verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
176             verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
177 
178         }
179 
180     }
181 
182     @Test(timeOut = 30_000)
183     public void testNoAsyncPostActionsAreCalled(ITestContext context) throws Exception {
184 
185         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
186 
187         PostCommitActions syncPostCommitter =
188                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
189         ListeningExecutorService postCommitExecutor =
190                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
191                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
192         PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
193 
194         TransactionManager tm = newTransactionManager(context, asyncPostCommitter);
195 
196         final CountDownLatch updateShadowCellsCalledLatch = new CountDownLatch(1);
197         final CountDownLatch removeCommitTableEntryCalledLatch = new CountDownLatch(1);
198 
199         // Simulate shadow cells are not updated and commit table is not clean
200         doAnswer(new Answer<Void>() {
201             public Void answer(InvocationOnMock invocation) {
202                 // Do not invoke real method simulating a fail of the shadow cells update
203                 updateShadowCellsCalledLatch.countDown();
204                 return null;
205             }
206         }).when(syncPostCommitter).updateShadowCells(any(AbstractTransaction.class));
207 
208         doAnswer(new Answer<Void>() {
209             public Void answer(InvocationOnMock invocation) {
210                 // Do not invoke real method simulating a fail of the async clean of commit table entry
211                 removeCommitTableEntryCalledLatch.countDown();
212                 return null;
213             }
214         }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
215 
216 
217         try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
218 
219             // Execute tx with async post commit actions
220             Transaction tx1 = tm.begin();
221 
222             Put put1 = new Put(row1);
223             put1.add(family, qualifier, Bytes.toBytes("hey!"));
224             txTable.put(tx1, put1);
225             Put put2 = new Put(row2);
226             put2.add(family, qualifier, Bytes.toBytes("hou!"));
227             txTable.put(tx1, put2);
228 
229             tm.commit(tx1);
230 
231             long tx1Id = tx1.getTransactionId();
232 
233             // The shadow cells shouldn't be there...
234             assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
235             assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
236             // ... and the should NOT have been cleaned
237             Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
238             assertTrue(commitTimestamp.isPresent());
239             assertTrue(commitTimestamp.get().isValid());
240 
241             updateShadowCellsCalledLatch.await();
242 
243             // Not even after waiting for the method call on the shadow cells update...
244             assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
245             assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
246 
247             removeCommitTableEntryCalledLatch.await();
248             // ... and after waiting for the method call for cleaning the commit table entry
249             commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
250             assertTrue(commitTimestamp.isPresent());
251             assertTrue(commitTimestamp.get().isValid());
252 
253             // Final checks
254             verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
255             verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
256 
257         }
258 
259     }
260 
261     @Test(timeOut = 30_000)
262     public void testOnlyShadowCellsUpdateIsExecuted(ITestContext context) throws Exception {
263 
264         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
265 
266         PostCommitActions syncPostCommitter =
267                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
268         ListeningExecutorService postCommitExecutor =
269                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
270                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
271         PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
272 
273         TransactionManager tm = newTransactionManager(context, asyncPostCommitter);
274 
275         final CountDownLatch removeCommitTableEntryCalledLatch = new CountDownLatch(1);
276 
277         doAnswer(new Answer<Void>() {
278             public Void answer(InvocationOnMock invocation) {
279                 // Do not invoke real method simulating a fail of the async clean of commit table entry
280                 removeCommitTableEntryCalledLatch.countDown();
281                 return null;
282             }
283         }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
284 
285 
286         try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
287 
288             // Execute tx with async post commit actions
289             Transaction tx1 = tm.begin();
290 
291             Put put1 = new Put(row1);
292             put1.add(family, qualifier, Bytes.toBytes("hey!"));
293             txTable.put(tx1, put1);
294             Put put2 = new Put(row2);
295             put2.add(family, qualifier, Bytes.toBytes("hou!"));
296             txTable.put(tx1, put2);
297 
298             tm.commit(tx1);
299 
300             long tx1Id = tx1.getTransactionId();
301 
302             // We continue when the unsuccessful call of the method for cleaning commit table has been invoked
303             removeCommitTableEntryCalledLatch.await();
304 
305             // We check that the shadow cells are there (because the update of the shadow cells should precede
306             // the cleaning of the commit table entry) ...
307             assertTrue(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
308             assertTrue(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
309 
310             // ... and the commit table entry has NOT been cleaned
311             Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
312             assertTrue(commitTimestamp.isPresent());
313             assertTrue(commitTimestamp.get().isValid());
314 
315             // Final checks
316             verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
317             verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
318 
319         }
320 
321     }
322 
323 }