1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
21 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
22 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
23 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.doThrow;
27 import static org.mockito.Mockito.spy;
28 import static org.testng.Assert.assertEquals;
29 import static org.testng.Assert.assertFalse;
30 import static org.testng.Assert.assertTrue;
31
32 import java.util.Map;
33
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Put;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.omid.committable.CommitTable;
39 import org.apache.omid.committable.CommitTable.CommitTimestamp;
40 import org.apache.omid.metrics.NullMetricsProvider;
41 import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
42 import org.testng.ITestContext;
43 import org.testng.annotations.Test;
44
45 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
46 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
47 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
48
49 @Test(groups = "sharedHBase")
50 public class TestHBaseTransactionClient extends OmidTestBase {
51
52 private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
53 private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
54 private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
55 private static final byte[] qualifier = Bytes.toBytes("testdata");
56 private static final byte[] data1 = Bytes.toBytes("testWrite-1");
57
58 @Test(timeOut = 30_000)
59 public void testIsCommitted(ITestContext context) throws Exception {
60 TransactionManager tm = newTransactionManager(context);
61 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
62 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
63 ((AbstractTransactionManager)tm).getCommitTableClient());
64 TTable table = spy(new TTable(htable, snapshotFilter, false));
65
66 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
67
68 Put put = new Put(row1);
69 put.addColumn(family, qualifier, data1);
70 table.put(t1, put);
71 tm.commit(t1);
72
73 HBaseTransaction t2 = (HBaseTransaction) tm.begin();
74 put = new Put(row2);
75 put.addColumn(family, qualifier, data1);
76 table.put(t2, put);
77 table.flushCommits();
78
79 HBaseTransaction t3 = (HBaseTransaction) tm.begin();
80 put = new Put(row2);
81 put.addColumn(family, qualifier, data1);
82 table.put(t3, put);
83 tm.commit(t3);
84
85 HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
86 HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
87 HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
88
89 assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
90 assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
91 assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
92 }
93
94 @Test(timeOut = 30_000)
95 public void testCrashAfterCommit(ITestContext context) throws Exception {
96 PostCommitActions syncPostCommitter =
97 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
98 AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
99
100 doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
101
102 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
103 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
104 tm.getCommitTableClient());
105 TTable table = spy(new TTable(htable, snapshotFilter, false));
106
107 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
108
109
110 Put put = new Put(row1);
111 put.addColumn(family, qualifier, data1);
112 table.put(t1, put);
113 try {
114 tm.commit(t1);
115 } catch (Exception e) {
116
117 }
118
119 assertTrue(CellUtils.hasCell(row1, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
120 "Cell should be there");
121 assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
122 "Shadow cell should not be there");
123
124 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
125
126 HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
127 assertTrue(snapshotFilter.isCommitted(hBaseCellId, 0, false), "row1 should be committed");
128 }
129
130 @Test(timeOut = 30_000)
131 public void testReadCommitTimestampFromCommitTable(ITestContext context) throws Exception {
132
133
134 final long NON_EXISTING_CELL_TS = 1000L;
135
136 PostCommitActions syncPostCommitter =
137 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
138 AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
139
140 doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
141
142
143 Optional<CommitTimestamp> optionalCT = tm.commitTableClient.getCommitTimestamp(NON_EXISTING_CELL_TS).get();
144 assertFalse(optionalCT.isPresent());
145
146 try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
147
148
149
150 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
151 Put put = new Put(row1);
152 put.addColumn(family, qualifier, data1);
153 table.put(tx1, put);
154
155 assertTrue(tm.commitTableClient.tryInvalidateTransaction(tx1.getStartTimestamp()).get());
156 optionalCT = tm.commitTableClient.getCommitTimestamp(tx1.getStartTimestamp()).get();
157 assertTrue(optionalCT.isPresent());
158 CommitTimestamp ct = optionalCT.get();
159 assertFalse(ct.isValid());
160 assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
161 assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
162
163
164
165 HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
166 Put otherPut = new Put(row1);
167 otherPut.addColumn(family, qualifier, data1);
168 table.put(tx2, otherPut);
169 try {
170 tm.commit(tx2);
171 } catch (Exception e) {
172
173 }
174
175 optionalCT = tm.commitTableClient.getCommitTimestamp(tx2.getStartTimestamp()).get();
176 assertTrue(optionalCT.isPresent());
177 ct = optionalCT.get();
178 assertTrue(ct.isValid());
179 assertEquals(ct.getValue(), tx2.getCommitTimestamp());
180 assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
181 }
182 }
183
184 @Test(timeOut = 30_000)
185 public void testReadCommitTimestampFromShadowCell(ITestContext context) throws Exception {
186
187 final long NON_EXISTING_CELL_TS = 1L;
188
189 HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
190
191 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
192 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
193 tm.getCommitTableClient());
194
195 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
196
197
198 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, NON_EXISTING_CELL_TS);
199
200 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
201 Maps.<Long, Long>newHashMap());
202 Optional<CommitTimestamp> optionalCT = snapshotFilter
203 .readCommitTimestampFromShadowCell(NON_EXISTING_CELL_TS, ctLocator);
204 assertFalse(optionalCT.isPresent());
205
206
207 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
208 Put put = new Put(row1);
209 put.addColumn(family, qualifier, data1);
210 table.put(tx1, put);
211 tm.commit(tx1);
212
213 optionalCT = snapshotFilter.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator);
214 assertTrue(optionalCT.isPresent());
215 CommitTimestamp ct = optionalCT.get();
216 assertTrue(ct.isValid());
217 assertEquals(ct.getValue(), tx1.getCommitTimestamp());
218 assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
219
220 }
221
222 }
223
224
225 @Test(timeOut = 30_000)
226 public void testCellCommitTimestampIsLocatedInCache(ITestContext context) throws Exception {
227
228 final long CELL_ST = 1L;
229 final long CELL_CT = 2L;
230
231 HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
232
233
234 Table htable = hBaseUtils.getConnection().getTable(TableName.valueOf(TEST_TABLE));
235 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
236 tm.getCommitTableClient());
237 TTable table = new TTable(htable, snapshotFilter, false);
238
239 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_ST);
240 Map<Long, Long> fakeCache = Maps.newHashMap();
241 fakeCache.put(CELL_ST, CELL_CT);
242
243
244 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, fakeCache);
245 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator,
246 false);
247 assertTrue(ct.isValid());
248 assertEquals(ct.getValue(), CELL_CT);
249 assertTrue(ct.getLocation().compareTo(CACHE) == 0);
250
251 }
252
253
254
255
256 @Test(timeOut = 30_000)
257 public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {
258
259 PostCommitActions syncPostCommitter =
260 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
261 AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
262
263 doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
264
265 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
266 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
267 tm.getCommitTableClient());
268
269 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
270
271
272 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
273 Put put = new Put(row1);
274 put.addColumn(family, qualifier, data1);
275 table.put(tx1, put);
276 try {
277 tm.commit(tx1);
278 } catch (Exception e) {
279
280 }
281
282
283 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
284 tx1.getStartTimestamp());
285 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
286 Maps.<Long, Long>newHashMap());
287 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
288 ctLocator, false);
289 assertTrue(ct.isValid());
290 long expectedCommitTS = tx1.getStartTimestamp() + CommitTable.MAX_CHECKPOINTS_PER_TXN;
291 assertEquals(ct.getValue(), expectedCommitTS);
292 assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
293 }
294
295 }
296
297
298 @Test(timeOut = 30_000)
299 public void testCellCommitTimestampIsLocatedInShadowCells(ITestContext context) throws Exception {
300
301 HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
302
303 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
304 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
305 tm.getCommitTableClient());
306
307 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
308
309 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
310 Put put = new Put(row1);
311 put.addColumn(family, qualifier, data1);
312 table.put(tx1, put);
313 tm.commit(tx1);
314
315
316
317 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
318 tx1.getStartTimestamp());
319 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
320 Maps.<Long, Long>newHashMap());
321 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
322 ctLocator, false);
323 assertTrue(ct.isValid());
324 assertEquals(ct.getValue(), tx1.getCommitTimestamp());
325 assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
326 }
327
328 }
329
330
331 @Test(timeOut = 30_000)
332 public void testCellFromTransactionInPreviousEpochGetsInvalidComitTimestamp(ITestContext context) throws Exception {
333
334 final long CURRENT_EPOCH_FAKE = 1000L * CommitTable.MAX_CHECKPOINTS_PER_TXN;
335
336 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
337 AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
338
339
340 SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
341 f.set(Optional.<CommitTimestamp>absent());
342 doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
343
344 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
345 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
346 tm.getCommitTableClient());
347
348 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
349
350
351 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
352 Put put = new Put(row1);
353 put.addColumn(family, qualifier, data1);
354 table.put(tx1, put);
355
356
357
358 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
359 tx1.getStartTimestamp());
360 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
361 Maps.<Long, Long>newHashMap());
362
363 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE,
364 ctLocator, false);
365 assertFalse(ct.isValid());
366 assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
367 assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
368 }
369 }
370
371
372 @Test(timeOut = 30_000)
373 public void testCellCommitTimestampIsLocatedInCommitTableAfterNotBeingInvalidated(ITestContext context) throws Exception {
374
375 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
376 PostCommitActions syncPostCommitter =
377 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
378 AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));
379
380
381 doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
382
383
384 SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
385 f.set(Optional.<CommitTimestamp>absent());
386 doReturn(f).doCallRealMethod().when(commitTableClient).getCommitTimestamp(any(Long.class));
387
388 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
389 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
390 tm.getCommitTableClient());
391
392 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
393
394
395
396 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
397 Put put = new Put(row1);
398 put.addColumn(family, qualifier, data1);
399 table.put(tx1, put);
400 try {
401 tm.commit(tx1);
402 } catch (Exception e) {
403
404 }
405
406
407 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
408 tx1.getStartTimestamp());
409 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
410 Maps.<Long, Long>newHashMap());
411 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
412 ctLocator, false);
413 assertTrue(ct.isValid());
414 assertEquals(ct.getValue(), tx1.getCommitTimestamp());
415 assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
416 }
417
418 }
419
420
421 @Test(timeOut = 30_000)
422 public void testCellCommitTimestampIsLocatedInShadowCellsAfterNotBeingInvalidated(ITestContext context) throws Exception {
423
424 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
425 AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
426
427
428 SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
429 f.set(Optional.<CommitTimestamp>absent());
430 doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
431
432 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
433 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
434 tm.getCommitTableClient());
435
436 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
437
438
439 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
440 Put put = new Put(row1);
441 put.addColumn(family, qualifier, data1);
442 table.put(tx1, put);
443 tm.commit(tx1);
444
445
446
447 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
448 tx1.getStartTimestamp());
449 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
450 Maps.<Long, Long>newHashMap());
451 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
452 ctLocator,false);
453 assertTrue(ct.isValid());
454 assertEquals(ct.getValue(), tx1.getCommitTimestamp());
455 assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
456 }
457
458 }
459
460
461 @Test(timeOut = 30_000)
462 public void testCTLocatorReturnsAValidCTWhenNotPresent(ITestContext context) throws Exception {
463
464 final long CELL_TS = 1L;
465
466 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
467 AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
468
469 SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
470 f.set(Optional.<CommitTimestamp>absent());
471 doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
472
473 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
474 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
475 tm.getCommitTableClient());
476
477 try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
478 HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_TS);
479 CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
480 Maps.<Long, Long>newHashMap());
481 CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(),
482 ctLocator, false);
483 assertTrue(ct.isValid());
484 assertEquals(ct.getValue(), -1L);
485 assertTrue(ct.getLocation().compareTo(NOT_PRESENT) == 0);
486 }
487
488 }
489 }