1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.omid.transaction.CellUtils.hasCell;
21 import static org.apache.omid.transaction.CellUtils.hasShadowCell;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyInt;
24 import static org.mockito.Matchers.anyLong;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doThrow;
27 import static org.mockito.Mockito.never;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.testng.Assert.assertFalse;
32 import static org.testng.Assert.assertNull;
33 import static org.testng.Assert.assertTrue;
34
35 import java.util.Arrays;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import org.apache.hadoop.hbase.Cell;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.client.Delete;
46 import org.apache.hadoop.hbase.client.Get;
47 import org.apache.hadoop.hbase.client.HBaseAdmin;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Result;
50 import org.apache.hadoop.hbase.client.ResultScanner;
51 import org.apache.hadoop.hbase.client.Scan;
52 import org.apache.hadoop.hbase.client.Table;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.omid.committable.CommitTable;
55 import org.apache.omid.metrics.NullMetricsProvider;
56 import org.mockito.Matchers;
57 import org.mockito.invocation.InvocationOnMock;
58 import org.mockito.stubbing.Answer;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import org.testng.ITestContext;
62 import org.testng.annotations.Test;
63
64 import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
65 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
66 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
67
68 @Test(groups = "sharedHBase")
69 public class TestShadowCells extends OmidTestBase {
70
71 private static final Logger LOG = LoggerFactory.getLogger(TestShadowCells.class);
72
73 private static final String TSO_SERVER_HOST = "localhost";
74 private static final int TSO_SERVER_PORT = 1234;
75
76 private static final String TEST_TABLE = "test";
77 private static final String TEST_FAMILY = "data";
78
79 static final byte[] row = Bytes.toBytes("test-sc");
80 private static final byte[] row1 = Bytes.toBytes("test-sc1");
81 private static final byte[] row2 = Bytes.toBytes("test-sc2");
82 private static final byte[] row3 = Bytes.toBytes("test-sc3");
83 static final byte[] family = Bytes.toBytes(TEST_FAMILY);
84 private static final byte[] qualifier = Bytes.toBytes("testdata");
85 private static final byte[] data1 = Bytes.toBytes("testWrite-1");
86
87
88 @Test(timeOut = 60_000)
89 public void testShadowCellsBasics(ITestContext context) throws Exception {
90
91 TransactionManager tm = newTransactionManager(context);
92
93 TTable table = new TTable(connection, TEST_TABLE);
94
95 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
96
97
98 Put put = new Put(row);
99 put.addColumn(family, qualifier, data1);
100 table.put(t1, put);
101
102
103 assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
104 "Cell should be there");
105 assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
106 "Shadow cell shouldn't be there");
107
108 tm.commit(t1);
109
110
111 assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
112 "Cell should be there");
113 assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
114 "Shadow cell should be there");
115
116
117 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
118
119 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
120 hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
121 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
122 TransactionManager tm2 = HBaseTransactionManager.builder(hbaseOmidClientConf)
123 .commitTableClient(commitTableClient)
124 .build();
125
126 Transaction t2 = tm2.begin();
127 Get get = new Get(row);
128 get.addColumn(family, qualifier);
129
130 Result getResult = table.get(t2, get);
131 assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
132 verify(commitTableClient, never()).getCommitTimestamp(anyLong());
133 }
134
135 @Test(timeOut = 60_000)
136 public void testCrashingAfterCommitDoesNotWriteShadowCells(ITestContext context) throws Exception {
137
138 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
139
140 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
141 hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
142 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
143 PostCommitActions syncPostCommitter = spy(
144 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
145 AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
146 .postCommitter(syncPostCommitter)
147 .commitTableClient(commitTableClient)
148 .commitTableWriter(getCommitTable(context).getWriter())
149 .build());
150
151
152 doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
153
154 TTable table = new TTable(connection, TEST_TABLE);
155
156 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
157
158
159 Put put = new Put(row);
160 put.addColumn(family, qualifier, data1);
161 table.put(t1, put);
162 try {
163 tm.commit(t1);
164 } catch (Exception e) {
165
166 }
167
168
169 assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
170 "Cell should be there");
171 assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
172 "Shadow cell should not be there");
173
174 Transaction t2 = tm.begin();
175 Get get = new Get(row);
176 get.addColumn(family, qualifier);
177
178 Result getResult = table.get(t2, get);
179 assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Shadow cell should not be there");
180 verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
181 }
182
183 @Test(timeOut = 60_000)
184 public void testShadowCellIsHealedAfterCommitCrash(ITestContext context) throws Exception {
185
186 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
187
188 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
189 hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
190 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
191 PostCommitActions syncPostCommitter = spy(
192 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
193 AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
194 .postCommitter(syncPostCommitter)
195 .commitTableWriter(getCommitTable(context).getWriter())
196 .commitTableClient(commitTableClient)
197 .build());
198
199
200 doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
201
202 TTable table = new TTable(connection, TEST_TABLE);
203
204 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
205
206
207 Put put = new Put(row);
208 put.addColumn(family, qualifier, data1);
209 table.put(t1, put);
210 try {
211 tm.commit(t1);
212 } catch (Exception e) {
213
214 }
215
216 assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
217 "Cell should be there");
218 assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
219 "Shadow cell should not be there");
220
221 Transaction t2 = tm.begin();
222 Get get = new Get(row);
223 get.addColumn(family, qualifier);
224
225
226 Result getResult = table.get(t2, get);
227 assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
228 verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
229
230 assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
231 "Cell should be there");
232 assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
233 "Shadow cell should be there after being healed");
234
235
236
237
238 getResult = table.get(t2, get);
239 assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
240 verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
241 }
242
243 @Test(timeOut = 60_000)
244 public void testTransactionNeverCompletesWhenAnExceptionIsThrownUpdatingShadowCells(ITestContext context)
245 throws Exception {
246
247 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
248
249 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
250 hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
251 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
252 PostCommitActions syncPostCommitter = spy(
253 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
254 AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
255 .postCommitter(syncPostCommitter)
256 .commitTableClient(commitTableClient)
257 .commitTableWriter(getCommitTable(context).getWriter())
258 .build());
259
260 final TTable table = new TTable(connection, TEST_TABLE);
261
262 HBaseTransaction tx = (HBaseTransaction) tm.begin();
263
264 Put put = new Put(row);
265 put.addColumn(family, qualifier, data1);
266 table.put(tx, put);
267
268
269 doAnswer(new Answer<ListenableFuture<Void>>() {
270 @Override
271 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
272 table.flushCommits();
273 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
274 admin.disableTable(TableName.valueOf(table.getTableName()));
275 return (ListenableFuture<Void>) invocation.callRealMethod();
276 }
277 }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
278
279
280
281
282
283
284
285 tm.commit(tx);
286
287
288 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
289 admin.enableTable(TableName.valueOf(table.getTableName()));
290
291
292 assertTrue(hasCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
293 "Cell should be there");
294 assertFalse(hasShadowCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
295 "Shadow cell should not be there");
296
297 verify(commitTableClient, times(0)).deleteCommitEntry(anyLong());
298
299 assertTrue(commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get().isPresent());
300
301 }
302
303 @Test(timeOut = 60_000)
304 public void testTransactionPostCommitUpdateSCBatch(ITestContext context)
305 throws Exception {
306
307 TransactionManager tm = newTransactionManager(context);
308
309 TTable table = new TTable(connection, TEST_TABLE);
310
311 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
312
313
314 Put put = new Put(row);
315 for (int i = 0; i < HBaseSyncPostCommitter.MAX_BATCH_SIZE*2 + 2; ++i) {
316 put.addColumn(family, Bytes.toBytes(String.valueOf("X") + i), data1);
317 }
318 table.put(t1, put);
319
320 tm.commit(t1);
321
322
323 for (int i = 0; i < 1002; ++i) {
324 assertTrue(hasShadowCell(row, family, Bytes.toBytes(String.valueOf("X") + i), t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
325 "Shadow cell should be there");
326 }
327 }
328
329
330 @Test(timeOut = 60_000)
331 public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
332 final CountDownLatch readAfterCommit = new CountDownLatch(1);
333 final CountDownLatch postCommitBegin = new CountDownLatch(1);
334 final CountDownLatch postCommitEnd = new CountDownLatch(1);
335
336 final AtomicBoolean readFailed = new AtomicBoolean(false);
337 PostCommitActions syncPostCommitter =
338 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
339 AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
340
341 doAnswer(new Answer<ListenableFuture<Void>>() {
342 @Override
343 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
344 LOG.info("Releasing readAfterCommit barrier");
345 readAfterCommit.countDown();
346 LOG.info("Waiting postCommitBegin barrier");
347 postCommitBegin.await();
348 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
349 LOG.info("Releasing postCommitEnd barrier");
350 postCommitEnd.countDown();
351 return result;
352 }
353 }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
354
355
356 final TTable table = new TTable(connection, TEST_TABLE);
357
358 final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
359
360
361 Thread readThread = new Thread("Read Thread") {
362 @Override
363 public void run() {
364 LOG.info("Waiting readAfterCommit barrier");
365 try {
366 readAfterCommit.await();
367 Table htable = table.getHTable();
368 Table healer = table.getHTable();
369
370 final SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, healer)));
371 final TTable table = new TTable(htable ,snapshotFilter);
372 doAnswer(new Answer<List<KeyValue>>() {
373 @SuppressWarnings("unchecked")
374 @Override
375 public List<KeyValue> answer(InvocationOnMock invocation) throws Throwable {
376 LOG.info("Release postCommitBegin barrier");
377 postCommitBegin.countDown();
378 LOG.info("Waiting postCommitEnd barrier");
379 postCommitEnd.await();
380 return (List<KeyValue>) invocation.callRealMethod();
381 }
382 }).when(snapshotFilter).filterCellsForSnapshot(Matchers.<List<Cell>>any(),
383 any(HBaseTransaction.class), anyInt(), Matchers.<Map<String, Long>>any(), Matchers.<Map<String,byte[]>>any());
384
385 TransactionManager tm = newTransactionManager(context);
386 if (hasShadowCell(row,
387 family,
388 qualifier,
389 t1.getStartTimestamp(),
390 new TTableCellGetterAdapter(table))) {
391 readFailed.set(true);
392 }
393
394 Transaction t = tm.begin();
395 Get get = new Get(row);
396 get.addColumn(family, qualifier);
397
398 Result getResult = table.get(t, get);
399 Cell cell = getResult.getColumnLatestCell(family, qualifier);
400 if (!Arrays.equals(data1, CellUtil.cloneValue(cell))
401 || !hasShadowCell(row,
402 family,
403 qualifier,
404 cell.getTimestamp(),
405 new TTableCellGetterAdapter(table))) {
406 readFailed.set(true);
407 } else {
408 LOG.info("Read succeeded");
409 }
410 } catch (Throwable e) {
411 readFailed.set(true);
412 LOG.error("Error whilst reading", e);
413 }
414 }
415 };
416 readThread.start();
417
418
419 Put put = new Put(row);
420 put.addColumn(family, qualifier, data1);
421 table.put(t1, put);
422 tm.commit(t1);
423
424 readThread.join();
425
426 assertFalse(readFailed.get(), "Read should have succeeded");
427
428 }
429
430
431
432
433
434
435
436 @Test(timeOut = 60_000)
437 public void testGetOldShadowCells(ITestContext context) throws Exception {
438
439 TransactionManager tm = newTransactionManager(context);
440
441 TTable table = new TTable(connection, TEST_TABLE);
442 Table htable = table.getHTable();
443
444
445 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
446 Put put = new Put(row1);
447 put.addColumn(family, qualifier, data1);
448 table.put(t1, put);
449 tm.commit(t1);
450
451 HBaseTransaction t2 = (HBaseTransaction) tm.begin();
452 put = new Put(row2);
453 put.addColumn(family, qualifier, data1);
454 table.put(t2, put);
455 tm.commit(t2);
456
457 HBaseTransaction t3 = (HBaseTransaction) tm.begin();
458 put = new Put(row3);
459 put.addColumn(family, qualifier, data1);
460 table.put(t3, put);
461 tm.commit(t3);
462
463
464
465 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
466 Optional<CommitTable.CommitTimestamp> ct1 = commitTableClient.getCommitTimestamp(t1.getStartTimestamp()).get();
467 Optional<CommitTable.CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(t2.getStartTimestamp()).get();
468 Optional<CommitTable.CommitTimestamp> ct3 = commitTableClient.getCommitTimestamp(t3.getStartTimestamp()).get();
469 assertFalse(ct1.isPresent(), "Shouldn't exist in commit table");
470 assertFalse(ct2.isPresent(), "Shouldn't exist in commit table");
471 assertFalse(ct3.isPresent(), "Shouldn't exist in commit table");
472
473
474 Delete del = new Delete(row2);
475 del.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
476 htable.delete(del);
477 table.flushCommits();
478
479
480 Transaction t4 = tm.begin();
481 Get get = new Get(row2);
482 get.addColumn(family, qualifier);
483
484 Result getResult = table.get(t4, get);
485 assertTrue(getResult.isEmpty(), "Should have nothing");
486
487 Transaction t5 = tm.begin();
488 Scan s = new Scan();
489 ResultScanner scanner = table.getScanner(t5, s);
490 Result result1 = scanner.next();
491 Result result2 = scanner.next();
492 Result result3 = scanner.next();
493 scanner.close();
494
495 assertNull(result3);
496 assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
497 assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
498 assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
499 assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
500
501
502 put = new Put(row2);
503 put.addColumn(family,
504 addLegacyShadowCellSuffix(qualifier),
505 t2.getStartTimestamp(),
506 Bytes.toBytes(t2.getCommitTimestamp()));
507 htable.put(put);
508
509
510
511 Transaction t6 = tm.begin();
512 get = new Get(row2);
513 get.addColumn(family, qualifier);
514
515 getResult = table.get(t6, get);
516 assertFalse(getResult.containsColumn(family, qualifier), "Should NOT have column");
517
518 Transaction t7 = tm.begin();
519 s = new Scan();
520 scanner = table.getScanner(t7, s);
521 result1 = scanner.next();
522 result2 = scanner.next();
523 result3 = scanner.next();
524 scanner.close();
525
526 assertNull(result3, "There should only be 2 rows");
527 assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
528 assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
529 assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
530 assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
531 }
532
533
534
535
536
537 private static final byte[] LEGACY_SHADOW_CELL_SUFFIX = ":OMID_CTS".getBytes(Charsets.UTF_8);
538
539 private static byte[] addLegacyShadowCellSuffix(byte[] qualifier) {
540 return com.google.common.primitives.Bytes.concat(qualifier, LEGACY_SHADOW_CELL_SUFFIX);
541 }
542
543 }