1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Mockito.doAnswer;
22 import static org.mockito.Mockito.spy;
23 import static org.mockito.Mockito.times;
24 import static org.mockito.Mockito.verify;
25 import static org.testng.Assert.assertEquals;
26 import static org.testng.Assert.assertFalse;
27 import static org.testng.Assert.assertNotNull;
28 import static org.testng.Assert.assertTrue;
29
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.Executors;
32
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.omid.committable.CommitTable;
38 import org.apache.omid.metrics.NullMetricsProvider;
39 import org.mockito.invocation.InvocationOnMock;
40 import org.mockito.stubbing.Answer;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import org.testng.ITestContext;
44 import org.testng.annotations.Test;
45
46 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
47 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
48 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
49 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
50 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
51 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
52
53 @Test(groups = "sharedHBase")
54 public class TestAsynchronousPostCommitter extends OmidTestBase {
55
56 private static final Logger LOG = LoggerFactory.getLogger(TestAsynchronousPostCommitter.class);
57
58 private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
59 private static final byte[] qualifier = Bytes.toBytes("test-qual");
60
61 byte[] row1 = Bytes.toBytes("test-is-committed1");
62 byte[] row2 = Bytes.toBytes("test-is-committed2");
63
64 @Test(timeOut = 30_000)
65 public void testPostCommitActionsAreCalledAsynchronously(ITestContext context) throws Exception {
66
67 CommitTable.Client commitTableClient = getCommitTable(context).getClient();
68
69 PostCommitActions syncPostCommitter =
70 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
71 ListeningExecutorService postCommitExecutor =
72 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
73 new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
74 PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
75
76 TransactionManager tm = newTransactionManager(context, asyncPostCommitter);
77
78 final CountDownLatch beforeUpdatingShadowCellsLatch = new CountDownLatch(1);
79 final CountDownLatch afterUpdatingShadowCellsLatch = new CountDownLatch(1);
80 final CountDownLatch beforeRemovingCTEntryLatch = new CountDownLatch(1);
81 final CountDownLatch afterRemovingCTEntryLatch = new CountDownLatch(1);
82
83 doAnswer(new Answer<ListenableFuture<Void>>() {
84 public ListenableFuture<Void> answer(InvocationOnMock invocation) {
85 try {
86 beforeUpdatingShadowCellsLatch.await();
87 invocation.callRealMethod();
88 afterUpdatingShadowCellsLatch.countDown();
89 } catch (Throwable throwable) {
90 throwable.printStackTrace();
91 }
92 return SettableFuture.create();
93 }
94 }).when(syncPostCommitter).updateShadowCells(any(AbstractTransaction.class));
95
96 doAnswer(new Answer<ListenableFuture<Void>>() {
97 public ListenableFuture<Void> answer(InvocationOnMock invocation) {
98 try {
99 beforeRemovingCTEntryLatch.await();
100 LOG.info("We are here");
101 invocation.callRealMethod();
102 afterRemovingCTEntryLatch.countDown();
103 } catch (Throwable throwable) {
104 throwable.printStackTrace();
105 }
106 return SettableFuture.create();
107 }
108 }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
109
110 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
111
112
113 Transaction tx1 = tm.begin();
114
115 Put put1 = new Put(row1);
116 put1.addColumn(family, qualifier, Bytes.toBytes("hey!"));
117 txTable.put(tx1, put1);
118 Put put2 = new Put(row2);
119 put2.addColumn(family, qualifier, Bytes.toBytes("hou!"));
120 txTable.put(tx1, put2);
121
122 tm.commit(tx1);
123
124 long tx1Id = tx1.getTransactionId();
125
126
127 assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
128 assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
129
130
131 Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
132 assertTrue(commitTimestamp.isPresent());
133 assertTrue(commitTimestamp.get().isValid());
134 assertEquals(commitTimestamp.get().getValue(), ((AbstractTransaction) tx1).getCommitTimestamp());
135
136
137
138 Transaction tx2 = tm.begin();
139 Get get1 = new Get(row1);
140 Result result = txTable.get(tx2, get1);
141 byte[] value = result.getValue(family, qualifier);
142 assertNotNull(value);
143 assertEquals("hey!", Bytes.toString(value));
144
145 Get get2 = new Get(row2);
146 result = txTable.get(tx2, get2);
147 value = result.getValue(family, qualifier);
148 assertNotNull(value);
149 assertEquals("hou!", Bytes.toString(value));
150
151
152 beforeUpdatingShadowCellsLatch.countDown();
153 afterUpdatingShadowCellsLatch.await();
154
155
156 verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
157 assertTrue(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
158 assertTrue(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
159
160 commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
161 assertTrue(commitTimestamp.isPresent());
162 assertTrue(commitTimestamp.get().isValid());
163 assertEquals(commitTimestamp.get().getValue(), ((AbstractTransaction) tx1).getCommitTimestamp());
164
165
166 beforeRemovingCTEntryLatch.countDown();
167 afterRemovingCTEntryLatch.await();
168
169
170 verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
171 commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
172 assertFalse(commitTimestamp.isPresent());
173
174
175 verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
176 verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
177
178 }
179
180 }
181
182 @Test(timeOut = 30_000)
183 public void testNoAsyncPostActionsAreCalled(ITestContext context) throws Exception {
184
185 CommitTable.Client commitTableClient = getCommitTable(context).getClient();
186
187 PostCommitActions syncPostCommitter =
188 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
189 ListeningExecutorService postCommitExecutor =
190 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
191 new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
192 PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
193
194 TransactionManager tm = newTransactionManager(context, asyncPostCommitter);
195
196 final CountDownLatch updateShadowCellsCalledLatch = new CountDownLatch(1);
197 final CountDownLatch removeCommitTableEntryCalledLatch = new CountDownLatch(1);
198
199
200 doAnswer(new Answer<Void>() {
201 public Void answer(InvocationOnMock invocation) {
202
203 updateShadowCellsCalledLatch.countDown();
204 return null;
205 }
206 }).when(syncPostCommitter).updateShadowCells(any(AbstractTransaction.class));
207
208 doAnswer(new Answer<Void>() {
209 public Void answer(InvocationOnMock invocation) {
210
211 removeCommitTableEntryCalledLatch.countDown();
212 return null;
213 }
214 }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
215
216
217 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
218
219
220 Transaction tx1 = tm.begin();
221
222 Put put1 = new Put(row1);
223 put1.addColumn(family, qualifier, Bytes.toBytes("hey!"));
224 txTable.put(tx1, put1);
225 Put put2 = new Put(row2);
226 put2.addColumn(family, qualifier, Bytes.toBytes("hou!"));
227 txTable.put(tx1, put2);
228
229 tm.commit(tx1);
230
231 long tx1Id = tx1.getTransactionId();
232
233
234 assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
235 assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
236
237 Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
238 assertTrue(commitTimestamp.isPresent());
239 assertTrue(commitTimestamp.get().isValid());
240
241 updateShadowCellsCalledLatch.await();
242
243
244 assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
245 assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
246
247 removeCommitTableEntryCalledLatch.await();
248
249 commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
250 assertTrue(commitTimestamp.isPresent());
251 assertTrue(commitTimestamp.get().isValid());
252
253
254 verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
255 verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
256
257 }
258
259 }
260
261 @Test(timeOut = 30_000)
262 public void testOnlyShadowCellsUpdateIsExecuted(ITestContext context) throws Exception {
263
264 CommitTable.Client commitTableClient = getCommitTable(context).getClient();
265
266 PostCommitActions syncPostCommitter =
267 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
268 ListeningExecutorService postCommitExecutor =
269 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
270 new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
271 PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
272
273 TransactionManager tm = newTransactionManager(context, asyncPostCommitter);
274
275 final CountDownLatch removeCommitTableEntryCalledLatch = new CountDownLatch(1);
276
277 doAnswer(new Answer<Void>() {
278 public Void answer(InvocationOnMock invocation) {
279
280 removeCommitTableEntryCalledLatch.countDown();
281 return null;
282 }
283 }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
284
285
286 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
287
288
289 Transaction tx1 = tm.begin();
290
291 Put put1 = new Put(row1);
292 put1.addColumn(family, qualifier, Bytes.toBytes("hey!"));
293 txTable.put(tx1, put1);
294 Put put2 = new Put(row2);
295 put2.addColumn(family, qualifier, Bytes.toBytes("hou!"));
296 txTable.put(tx1, put2);
297
298 tm.commit(tx1);
299
300 long tx1Id = tx1.getTransactionId();
301
302
303 removeCommitTableEntryCalledLatch.await();
304
305
306
307 assertTrue(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
308 assertTrue(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable)));
309
310
311 Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get();
312 assertTrue(commitTimestamp.isPresent());
313 assertTrue(commitTimestamp.get().isValid());
314
315
316 verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class));
317 verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class));
318
319 }
320
321 }
322
323 }