View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Mockito.doAnswer;
22  import static org.mockito.Mockito.doReturn;
23  import static org.mockito.Mockito.doThrow;
24  import static org.mockito.Mockito.spy;
25  import static org.testng.Assert.assertEquals;
26  import static org.testng.Assert.assertFalse;
27  import static org.testng.Assert.assertNull;
28  import static org.testng.Assert.assertTrue;
29  import static org.testng.Assert.fail;
30  
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Random;
35  import java.util.Set;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.Coprocessor;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.MiniHBaseCluster;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Admin;
49  import org.apache.hadoop.hbase.client.Connection;
50  import org.apache.hadoop.hbase.client.ConnectionFactory;
51  import org.apache.hadoop.hbase.client.Delete;
52  import org.apache.hadoop.hbase.client.Get;
53  import org.apache.hadoop.hbase.client.Put;
54  import org.apache.hadoop.hbase.client.Result;
55  import org.apache.hadoop.hbase.client.ResultScanner;
56  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
57  import org.apache.hadoop.hbase.client.Row;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.client.Table;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.apache.omid.HBaseShims;
62  import org.apache.omid.TestUtils;
63  import org.apache.omid.committable.CommitTable;
64  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
65  import org.apache.omid.metrics.NullMetricsProvider;
66  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
67  import org.apache.omid.tso.TSOServer;
68  import org.apache.omid.tso.TSOServerConfig;
69  import org.mockito.invocation.InvocationOnMock;
70  import org.mockito.stubbing.Answer;
71  import org.slf4j.Logger;
72  import org.slf4j.LoggerFactory;
73  import org.testng.annotations.AfterClass;
74  import org.testng.annotations.BeforeClass;
75  import org.testng.annotations.BeforeMethod;
76  import org.testng.annotations.Test;
77  
78  import com.google.common.util.concurrent.SettableFuture;
79  import com.google.inject.Guice;
80  import com.google.inject.Injector;
81  
82  public class TestCompaction {
83  
84      private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);
85  
86      private static final String TEST_FAMILY = "test-fam";
87      private static final String TEST_QUALIFIER = "test-qual";
88  
89      private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
90      private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
91      private final byte[] data = Bytes.toBytes("testWrite-1");
92  
93      private static final int MAX_VERSIONS = 3;
94  
95      private Random randomGenerator;
96      private AbstractTransactionManager tm;
97  
98      private Injector injector;
99  
100     private Admin admin;
101     private Configuration hbaseConf;
102     private HBaseTestingUtility hbaseTestUtil;
103     private MiniHBaseCluster hbaseCluster;
104 
105     private TSOServer tso;
106 
107 
108     private CommitTable commitTable;
109     private PostCommitActions syncPostCommitter;
110     private static Connection connection;
111 
112     @BeforeClass
113     public void setupTestCompation() throws Exception {
114         TSOServerConfig tsoConfig = new TSOServerConfig();
115         tsoConfig.setPort(1234);
116         tsoConfig.setConflictMapSize(1);
117         tsoConfig.setWaitStrategy("LOW_CPU");
118         injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
119         hbaseConf = injector.getInstance(Configuration.class);
120         HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
121         HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
122 
123         // settings required for #testDuplicateDeletes()
124         hbaseConf.setInt("hbase.hstore.compaction.min", 2);
125         hbaseConf.setInt("hbase.hstore.compaction.max", 2);
126         setupHBase();
127         connection = ConnectionFactory.createConnection(hbaseConf);
128         admin = connection.getAdmin();
129         createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
130         setupTSO();
131 
132         commitTable = injector.getInstance(CommitTable.class);
133     }
134 
135     private void setupHBase() throws Exception {
136         LOG.info("--------------------------------------------------------------------------------------------------");
137         LOG.info("Setting up HBase");
138         LOG.info("--------------------------------------------------------------------------------------------------");
139         hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
140         LOG.info("--------------------------------------------------------------------------------------------------");
141         LOG.info("Creating HBase MiniCluster");
142         LOG.info("--------------------------------------------------------------------------------------------------");
143         hbaseCluster = hbaseTestUtil.startMiniCluster(1);
144     }
145 
146     private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
147                                            HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
148         createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
149 
150         createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
151     }
152 
153     private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
154         if (!admin.tableExists(TableName.valueOf(tableName))) {
155             LOG.info("Creating {} table...", tableName);
156             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
157 
158             for (byte[] family : families) {
159                 HColumnDescriptor datafam = new HColumnDescriptor(family);
160                 datafam.setMaxVersions(MAX_VERSIONS);
161                 desc.addFamily(datafam);
162             }
163 
164             desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
165             admin.createTable(desc);
166             for (byte[] family : families) {
167                 CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
168             }
169         }
170 
171     }
172 
173     private void setupTSO() throws IOException, InterruptedException {
174         tso = injector.getInstance(TSOServer.class);
175         tso.startAndWait();
176         TestUtils.waitForSocketListening("localhost", 1234, 100);
177         Thread.currentThread().setName("UnitTest(s) thread");
178     }
179 
180     @AfterClass
181     public void cleanupTestCompation() throws Exception {
182         teardownTSO();
183         hbaseCluster.shutdown();
184     }
185 
186     private void teardownTSO() throws IOException, InterruptedException {
187         tso.stopAndWait();
188         TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
189     }
190 
191     @BeforeMethod
192     public void setupTestCompactionIndividualTest() throws Exception {
193         randomGenerator = new Random(0xfeedcafeL);
194         tm = spy((AbstractTransactionManager) newTransactionManager());
195     }
196 
197     private TransactionManager newTransactionManager() throws Exception {
198         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
199         hbaseOmidClientConf.setConnectionString("localhost:1234");
200         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
201         CommitTable.Client commitTableClient = commitTable.getClient();
202         syncPostCommitter =
203                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
204         return HBaseTransactionManager.builder(hbaseOmidClientConf)
205                 .postCommitter(syncPostCommitter)
206                 .commitTableClient(commitTableClient)
207                 .build();
208     }
209 
210     @Test(timeOut = 60_000)
211     public void testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction() throws Throwable {
212         String TEST_TABLE = "testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction";
213         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
214         TTable txTable = new TTable(connection, TEST_TABLE);
215 
216         final int ROWS_TO_ADD = 5;
217 
218         long fakeAssignedLowWatermark = 0L;
219         for (int i = 0; i < ROWS_TO_ADD; ++i) {
220             long rowId = randomGenerator.nextLong();
221             Transaction tx = tm.begin();
222             if (i == (ROWS_TO_ADD / 2)) {
223                 fakeAssignedLowWatermark = tx.getTransactionId();
224                 LOG.info("AssignedLowWatermark " + fakeAssignedLowWatermark);
225             }
226             Put put = new Put(Bytes.toBytes(rowId));
227             put.addColumn(fam, qual, data);
228             txTable.put(tx, put);
229             tm.commit(tx);
230         }
231 
232         LOG.info("Flushing table {}", TEST_TABLE);
233         admin.flush(TableName.valueOf(TEST_TABLE));
234 
235         // Return a LWM that triggers compaction & stays between 1 and the max start timestamp assigned to previous TXs
236         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
237         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
238                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
239         CommitTable commitTable = injector.getInstance(CommitTable.class);
240         CommitTable.Client commitTableClient = spy(commitTable.getClient());
241         SettableFuture<Long> f = SettableFuture.create();
242         f.set(fakeAssignedLowWatermark);
243         doReturn(f).when(commitTableClient).readLowWatermark();
244         omidCompactor.commitTableClientQueue.add(commitTableClient);
245         LOG.info("Compacting table {}", TEST_TABLE);
246         admin.majorCompact(TableName.valueOf(TEST_TABLE));
247 
248         LOG.info("Sleeping for 3 secs");
249         Thread.sleep(3000);
250         LOG.info("Waking up after 3 secs");
251 
252         // No rows should have been discarded after compacting
253         assertEquals(rowCount(TEST_TABLE, fam), ROWS_TO_ADD, "Rows in table after compacting should be " + ROWS_TO_ADD);
254     }
255 
256     @Test(timeOut = 60_000)
257     public void testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction() throws Exception {
258         String TEST_TABLE = "testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction";
259         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
260         TTable txTable = new TTable(connection, TEST_TABLE);
261 
262         // The following line emulates a crash after commit that is observed in (*) below
263         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
264 
265         HBaseTransaction problematicTx = (HBaseTransaction) tm.begin();
266 
267         long row = randomGenerator.nextLong();
268 
269         // Test shadow cell are created properly
270         Put put = new Put(Bytes.toBytes(row));
271         put.addColumn(fam, qual, data);
272         txTable.put(problematicTx, put);
273         try {
274             tm.commit(problematicTx);
275         } catch (Exception e) { // (*) Crash
276             // Do nothing
277         }
278 
279         assertTrue(CellUtils.hasCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
280                                      new TTableCellGetterAdapter(txTable)),
281                    "Cell should be there");
282         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
283                                             new TTableCellGetterAdapter(txTable)),
284                     "Shadow cell should not be there");
285 
286         // Return a LWM that triggers compaction and has all the possible start timestamps below it
287         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
288         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
289                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
290         CommitTable commitTable = injector.getInstance(CommitTable.class);
291         CommitTable.Client commitTableClient = spy(commitTable.getClient());
292         SettableFuture<Long> f = SettableFuture.create();
293         f.set(Long.MAX_VALUE);
294         doReturn(f).when(commitTableClient).readLowWatermark();
295         omidCompactor.commitTableClientQueue.add(commitTableClient);
296 
297         LOG.info("Flushing table {}", TEST_TABLE);
298         admin.flush(TableName.valueOf(TEST_TABLE));
299 
300         LOG.info("Compacting table {}", TEST_TABLE);
301         admin.majorCompact(TableName.valueOf(TEST_TABLE));
302 
303         LOG.info("Sleeping for 3 secs");
304         Thread.sleep(3000);
305         LOG.info("Waking up after 3 secs");
306 
307         assertTrue(CellUtils.hasCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
308                                      new TTableCellGetterAdapter(txTable)),
309                    "Cell should be there");
310         assertTrue(CellUtils.hasShadowCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
311                                            new TTableCellGetterAdapter(txTable)),
312                    "Shadow cell should not be there");
313     }
314 
315     @Test(timeOut = 60_000)
316     public void testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction()
317             throws Throwable {
318         String
319                 TEST_TABLE =
320                 "testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction";
321         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
322         TTable txTable = new TTable(connection, TEST_TABLE);
323 
324         // The KV in this transaction should be discarded
325         HBaseTransaction neverendingTxBelowLowWatermark = (HBaseTransaction) tm.begin();
326         long rowId = randomGenerator.nextLong();
327         Put put = new Put(Bytes.toBytes(rowId));
328         put.addColumn(fam, qual, data);
329         txTable.put(neverendingTxBelowLowWatermark, put);
330         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
331                                      new TTableCellGetterAdapter(txTable)),
332                    "Cell should be there");
333         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
334                                             new TTableCellGetterAdapter(txTable)),
335                     "Shadow cell should not be there");
336 
337         // The KV in this transaction should be added without the shadow cells
338         HBaseTransaction neverendingTxAboveLowWatermark = (HBaseTransaction) tm.begin();
339         long anotherRowId = randomGenerator.nextLong();
340         put = new Put(Bytes.toBytes(anotherRowId));
341         put.addColumn(fam, qual, data);
342         txTable.put(neverendingTxAboveLowWatermark, put);
343         assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
344                                      new TTableCellGetterAdapter(txTable)),
345                    "Cell should be there");
346         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
347                                             new TTableCellGetterAdapter(txTable)),
348                     "Shadow cell should not be there");
349 
350         assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table before flushing should be 2");
351         LOG.info("Flushing table {}", TEST_TABLE);
352         admin.flush(TableName.valueOf(TEST_TABLE));
353         assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table after flushing should be 2");
354 
355         // Return a LWM that triggers compaction and stays between both ST of TXs, so assign 1st TX's start timestamp
356         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
357         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
358                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
359         CommitTable commitTable = injector.getInstance(CommitTable.class);
360         CommitTable.Client commitTableClient = spy(commitTable.getClient());
361         SettableFuture<Long> f = SettableFuture.create();
362         f.set(neverendingTxBelowLowWatermark.getStartTimestamp());
363         doReturn(f).when(commitTableClient).readLowWatermark();
364         omidCompactor.commitTableClientQueue.add(commitTableClient);
365         LOG.info("Compacting table {}", TEST_TABLE);
366         admin.majorCompact(TableName.valueOf(TEST_TABLE));
367 
368         LOG.info("Sleeping for 3 secs");
369         Thread.sleep(3000);
370         LOG.info("Waking up after 3 secs");
371 
372         // One row should have been discarded after compacting
373         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
374         // The row from the TX below the LWM should not be there (nor its Shadow Cell)
375         assertFalse(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
376                                       new TTableCellGetterAdapter(txTable)),
377                     "Cell should not be there");
378         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
379                                             new TTableCellGetterAdapter(txTable)),
380                     "Shadow cell should not be there");
381         // The row from the TX above the LWM should be there without the Shadow Cell
382         assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
383                                      new TTableCellGetterAdapter(txTable)),
384                    "Cell should be there");
385         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
386                                             new TTableCellGetterAdapter(txTable)),
387                     "Shadow cell should not be there");
388 
389     }
390 
391     @Test(timeOut = 60_000)
392     public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
393         String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReached";
394         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
395         TTable txTable = new TTable(connection, TEST_TABLE);
396 
397         // The KV in this transaction should be discarded but in the end should remain there because
398         // the commit table won't be accessed (simulating an error on access)
399         HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
400         long rowId = randomGenerator.nextLong();
401         Put put = new Put(Bytes.toBytes(rowId));
402         put.addColumn(fam, qual, data);
403         txTable.put(neverendingTx, put);
404         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
405                                      new TTableCellGetterAdapter(txTable)),
406                    "Cell should be there");
407         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
408                                             new TTableCellGetterAdapter(txTable)),
409                     "Shadow cell should not be there");
410 
411         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
412         LOG.info("Flushing table {}", TEST_TABLE);
413         admin.flush(TableName.valueOf(TEST_TABLE));
414         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
415 
416         // Break access to CommitTable functionality in Compactor
417         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
418         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
419                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
420         CommitTable commitTable = injector.getInstance(CommitTable.class);
421         CommitTable.Client commitTableClient = spy(commitTable.getClient());
422         SettableFuture<Long> f = SettableFuture.create();
423         f.setException(new IOException("Unable to read"));
424         doReturn(f).when(commitTableClient).readLowWatermark();
425         omidCompactor.commitTableClientQueue.add(commitTableClient);
426 
427         LOG.info("Compacting table {}", TEST_TABLE);
428         admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
429 
430         LOG.info("Sleeping for 3 secs");
431         Thread.sleep(3000);
432         LOG.info("Waking up after 3 secs");
433 
434         // All rows should be there after the failed compaction
435         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
436         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
437                                      new TTableCellGetterAdapter(txTable)),
438                    "Cell should be there");
439         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
440                                             new TTableCellGetterAdapter(txTable)),
441                     "Shadow cell should not be there");
442     }
443 
444     @Test(timeOut = 60_000)
445     public void testOriginalTableParametersAreAvoidedAlsoWhenCompacting() throws Throwable {
446         String TEST_TABLE = "testOriginalTableParametersAreAvoidedAlsoWhenCompacting";
447         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
448         TTable txTable = new TTable(connection, TEST_TABLE);
449 
450         long rowId = randomGenerator.nextLong();
451         for (int versionCount = 0; versionCount <= (2 * MAX_VERSIONS); versionCount++) {
452             Transaction tx = tm.begin();
453             Put put = new Put(Bytes.toBytes(rowId));
454             put.addColumn(fam, qual, Bytes.toBytes("testWrite-" + versionCount));
455             txTable.put(tx, put);
456             tm.commit(tx);
457         }
458 
459         Transaction tx = tm.begin();
460         Get get = new Get(Bytes.toBytes(rowId));
461         get.setMaxVersions(2 * MAX_VERSIONS);
462         assertEquals(get.getMaxVersions(), (2 * MAX_VERSIONS), "Max versions should be set to " + (2 * MAX_VERSIONS));
463         get.addColumn(fam, qual);
464         Result result = txTable.get(tx, get);
465         tm.commit(tx);
466         List<Cell> column = result.getColumnCells(fam, qual);
467         assertEquals(column.size(), 1, "There should be only one version in the result");
468 
469         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table before flushing");
470         LOG.info("Flushing table {}", TEST_TABLE);
471         admin.flush(TableName.valueOf(TEST_TABLE));
472         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after flushing");
473 
474         // Return a LWM that triggers compaction
475         compactEverything(TEST_TABLE);
476 
477         // One row should have been discarded after compacting
478         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
479 
480         tx = tm.begin();
481         get = new Get(Bytes.toBytes(rowId));
482         get.setMaxVersions(2 * MAX_VERSIONS);
483         assertEquals(get.getMaxVersions(), (2 * MAX_VERSIONS), "Max versions should be set to " + (2 * MAX_VERSIONS));
484         get.addColumn(fam, qual);
485         result = txTable.get(tx, get);
486         tm.commit(tx);
487         column = result.getColumnCells(fam, qual);
488         assertEquals(column.size(), 1, "There should be only one version in the result");
489         assertEquals(Bytes.toString(CellUtil.cloneValue(column.get(0))), "testWrite-" + (2 * MAX_VERSIONS),
490                      "Values don't match");
491     }
492 
493     // manually flush the regions on the region server.
494     // flushing like this prevents compaction running
495     // directly after the flush, which we want to avoid.
496     private void manualFlush(String tableName) throws Throwable {
497         LOG.info("Manually flushing all regions and waiting 2 secs");
498         HBaseShims.flushAllOnlineRegions(hbaseTestUtil.getHBaseCluster().getRegionServer(0),
499                                          TableName.valueOf(tableName));
500         TimeUnit.SECONDS.sleep(2);
501     }
502 
503     @Test(timeOut = 60_000)
504     public void testOldCellsAreDiscardedAfterCompaction() throws Exception {
505         String TEST_TABLE = "testOldCellsAreDiscardedAfterCompaction";
506         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
507         TTable txTable = new TTable(connection, TEST_TABLE);
508 
509         byte[] rowId = Bytes.toBytes("row");
510 
511         // Create 3 transactions modifying the same cell in a particular row
512         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
513         Put put1 = new Put(rowId);
514         put1.addColumn(fam, qual, Bytes.toBytes("testValue 1"));
515         txTable.put(tx1, put1);
516         tm.commit(tx1);
517 
518         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
519         Put put2 = new Put(rowId);
520         put2.addColumn(fam, qual, Bytes.toBytes("testValue 2"));
521         txTable.put(tx2, put2);
522         tm.commit(tx2);
523 
524         HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
525         Put put3 = new Put(rowId);
526         put3.addColumn(fam, qual, Bytes.toBytes("testValue 3"));
527         txTable.put(tx3, put3);
528         tm.commit(tx3);
529 
530         // Before compaction, the three timestamped values for the cell should be there
531         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
532         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
533                    "Put cell of Tx1 should be there");
534         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
535                    "Put shadow cell of Tx1 should be there");
536         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
537                    "Put cell of Tx2 cell should be there");
538         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
539                    "Put shadow cell of Tx2 should be there");
540         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
541                    "Put cell of Tx3 cell should be there");
542         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
543                    "Put shadow cell of Tx3 should be there");
544 
545         // Compact
546         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
547         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
548 
549         // After compaction, only the last value for the cell should have survived
550         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
551                     "Put cell of Tx1 should not be there");
552         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
553                     "Put shadow cell of Tx1 should not be there");
554         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
555                     "Put cell of Tx2 should not be there");
556         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
557                     "Put shadow cell of Tx2 should not be there");
558         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
559                    "Put cell of Tx3 cell should be there");
560         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
561                    "Put shadow cell of Tx3 should be there");
562 
563         // A new transaction after compaction should read the last value written
564         HBaseTransaction newTx1 = (HBaseTransaction) tm.begin();
565         Get newGet1 = new Get(rowId);
566         newGet1.addColumn(fam, qual);
567         Result result = txTable.get(newTx1, newGet1);
568         assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
569         // Write a new value
570         Put newPut1 = new Put(rowId);
571         newPut1.addColumn(fam, qual, Bytes.toBytes("new testValue 1"));
572         txTable.put(newTx1, newPut1);
573 
574         // Start a second new transaction
575         HBaseTransaction newTx2 = (HBaseTransaction) tm.begin();
576         // Commit first of the new tx
577         tm.commit(newTx1);
578 
579         // The second transaction should still read the previous value
580         Get newGet2 = new Get(rowId);
581         newGet2.addColumn(fam, qual);
582         result = txTable.get(newTx2, newGet2);
583         assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
584         tm.commit(newTx2);
585 
586         // Only two values -the new written by newTx1 and the last value
587         // for the cell after compaction- should have survived
588         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
589                     "Put cell of Tx1 should not be there");
590         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
591                     "Put shadow cell of Tx1 should not be there");
592         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
593                     "Put cell of Tx2 should not be there");
594         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
595                     "Put shadow cell of Tx2 should not be there");
596         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
597                    "Put cell of Tx3 cell should be there");
598         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
599                    "Put shadow cell of Tx3 should be there");
600         assertTrue(CellUtils.hasCell(rowId, fam, qual, newTx1.getStartTimestamp(), getter),
601                    "Put cell of NewTx1 cell should be there");
602         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, newTx1.getStartTimestamp(), getter),
603                    "Put shadow cell of NewTx1 should be there");
604     }
605 
606     /**
607      * Tests a case where a temporary failure to flush causes the compactor to crash
608      */
609     @Test(timeOut = 60_000)
610     public void testDuplicateDeletes() throws Throwable {
611         String TEST_TABLE = "testDuplicateDeletes";
612         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
613         TTable txTable = new TTable(connection, TEST_TABLE);
614 
615         // jump through hoops to trigger a minor compaction.
616         // a minor compaction will only run if there are enough
617         // files to be compacted, but that is less than the number
618         // of total files, in which case it will run a major
619         // compaction. The issue this is testing only shows up
620         // with minor compaction, as only Deletes can be duplicate
621         // and major compactions filter them out.
622         byte[] firstRow = "FirstRow".getBytes();
623         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
624         Put put0 = new Put(firstRow);
625         put0.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
626         txTable.put(tx0, put0);
627         tm.commit(tx0);
628 
629         // create the first hfile
630         manualFlush(TEST_TABLE);
631 
632         // write a row, it won't be committed
633         byte[] rowToBeCompactedAway = "compactMe".getBytes();
634         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
635         Put put1 = new Put(rowToBeCompactedAway);
636         put1.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
637         txTable.put(tx1, put1);
638         txTable.flushCommits();
639 
640         // write a row to trigger the double delete problem
641         byte[] row = "iCauseErrors".getBytes();
642         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
643         Put put2 = new Put(row);
644         put2.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
645         txTable.put(tx2, put2);
646         tm.commit(tx2);
647 
648         HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
649         Put put3 = new Put(row);
650         put3.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
651         txTable.put(tx3, put3);
652         txTable.flushCommits();
653 
654         // cause a failure on HBaseTM#preCommit();
655         Set<HBaseCellId> writeSet = tx3.getWriteSet();
656         assertEquals(1, writeSet.size());
657         List<HBaseCellId> newWriteSet = new ArrayList<>();
658         final AtomicBoolean flushFailing = new AtomicBoolean(true);
659         for (HBaseCellId id : writeSet) {
660             TTable failableHTable = spy(id.getTable());
661             doAnswer(new Answer<Void>() {
662                 @Override
663                 public Void answer(InvocationOnMock invocation)
664                         throws Throwable {
665                     if (flushFailing.get()) {
666                         throw new RetriesExhaustedWithDetailsException(new ArrayList<Throwable>(),
667                                                                        new ArrayList<Row>(), new ArrayList<String>());
668                     } else {
669                         invocation.callRealMethod();
670                     }
671                     return null;
672                 }
673             }).when(failableHTable).flushCommits();
674 
675             newWriteSet.add(new HBaseCellId(failableHTable,
676                                             id.getRow(), id.getFamily(),
677                                             id.getQualifier(), id.getTimestamp()));
678         }
679         writeSet.clear();
680         writeSet.addAll(newWriteSet);
681 
682         try {
683             tm.commit(tx3);
684             fail("Shouldn't succeed");
685         } catch (TransactionException tme) {
686             flushFailing.set(false);
687             tm.rollback(tx3);
688         }
689 
690         // create second hfile,
691         // it should contain multiple deletes
692         manualFlush(TEST_TABLE);
693 
694         // create loads of files
695         byte[] anotherRow = "someotherrow".getBytes();
696         HBaseTransaction tx4 = (HBaseTransaction) tm.begin();
697         Put put4 = new Put(anotherRow);
698         put4.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
699         txTable.put(tx4, put4);
700         tm.commit(tx4);
701 
702         // create third hfile
703         manualFlush(TEST_TABLE);
704 
705         // trigger minor compaction and give it time to run
706         setCompactorLWM(tx4.getStartTimestamp(), TEST_TABLE);
707         admin.compact(TableName.valueOf(TEST_TABLE));
708         Thread.sleep(3000);
709 
710         // check if the cell that should be compacted, is compacted
711         assertFalse(CellUtils.hasCell(rowToBeCompactedAway, fam, qual, tx1.getStartTimestamp(),
712                                       new TTableCellGetterAdapter(txTable)),
713                     "Cell should not be be there");
714     }
715 
716     @Test(timeOut = 60_000)
717     public void testNonOmidCFIsUntouched() throws Throwable {
718         String TEST_TABLE = "testNonOmidCFIsUntouched";
719         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
720         TTable txTable = new TTable(connection, TEST_TABLE);
721 
722         admin.disableTable(TableName.valueOf(TEST_TABLE));
723         byte[] nonOmidCF = Bytes.toBytes("nonOmidCF");
724         byte[] nonOmidQual = Bytes.toBytes("nonOmidCol");
725         HColumnDescriptor nonomidfam = new HColumnDescriptor(nonOmidCF);
726         nonomidfam.setMaxVersions(MAX_VERSIONS);
727         admin.addColumn(TableName.valueOf(TEST_TABLE), nonomidfam);
728         admin.enableTable(TableName.valueOf(TEST_TABLE));
729 
730         byte[] rowId = Bytes.toBytes("testRow");
731         Transaction tx = tm.begin();
732         Put put = new Put(rowId);
733         put.addColumn(fam, qual, Bytes.toBytes("testValue"));
734         txTable.put(tx, put);
735 
736         Put nonTxPut = new Put(rowId);
737         nonTxPut.addColumn(nonOmidCF, nonOmidQual, Bytes.toBytes("nonTxVal"));
738         txTable.getHTable().put(nonTxPut);
739         txTable.flushCommits(); // to make sure it left the client
740 
741         Get g = new Get(rowId);
742         Result result = txTable.getHTable().get(g);
743         assertEquals(result.getColumnCells(nonOmidCF, nonOmidQual).size(), 1, "Should be there, precompact");
744         assertEquals(result.getColumnCells(fam, qual).size(), 1, "Should be there, precompact");
745 
746         compactEverything(TEST_TABLE);
747 
748         result = txTable.getHTable().get(g);
749         assertEquals(result.getColumnCells(nonOmidCF, nonOmidQual).size(), 1, "Should be there, postcompact");
750         assertEquals(result.getColumnCells(fam, qual).size(), 0, "Should not be there, postcompact");
751     }
752 
753     // ----------------------------------------------------------------------------------------------------------------
754     // Tests on tombstones and non-transactional Deletes
755     // ----------------------------------------------------------------------------------------------------------------
756 
757     /**
758      * Test that when a major compaction runs, cells that were deleted non-transactionally dissapear
759      */
760     @Test(timeOut = 60_000)
761     public void testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs() throws Throwable {
762         String TEST_TABLE = "testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs";
763         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
764         TTable txTable = new TTable(connection, TEST_TABLE);
765 
766         Table table = txTable.getHTable();
767 
768         // Write first a value transactionally
769         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
770         byte[] rowId = Bytes.toBytes("row1");
771         Put p0 = new Put(rowId);
772         p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
773         txTable.put(tx0, p0);
774         tm.commit(tx0);
775 
776         // Then perform a non-transactional Delete
777         Delete d = new Delete(rowId);
778         d.addColumn(fam, qual);
779         table.delete(d);
780 
781         // Trigger a major compaction
782         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
783         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
784 
785         // Then perform a non-tx (raw) scan...
786         Scan scan = new Scan();
787         scan.setRaw(true);
788         ResultScanner scannerResults = table.getScanner(scan);
789 
790         // ...and test the deleted cell is not there anymore
791         assertNull(scannerResults.next(), "There should be no results in scan results");
792 
793         table.close();
794 
795     }
796 
797     /**
798      * Test that when a minor compaction runs, cells that were deleted non-transactionally are preserved. This is to
799      * allow users still access the cells when doing "improper" operations on a transactional table
800      */
801     @Test(timeOut = 60_000)
802     public void testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs() throws Throwable {
803         String TEST_TABLE = "testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs";
804         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
805         TTable txTable = new TTable(connection, TEST_TABLE);
806 
807         Table table = txTable.getHTable();
808 
809         // Configure the environment to create a minor compaction
810 
811         // Write first a value transactionally
812         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
813         byte[] rowId = Bytes.toBytes("row1");
814         Put p0 = new Put(rowId);
815         p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
816         txTable.put(tx0, p0);
817         tm.commit(tx0);
818 
819         // create the first hfile
820         manualFlush(TEST_TABLE);
821 
822         // Write another value transactionally
823         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
824         Put p1 = new Put(rowId);
825         p1.addColumn(fam, qual, Bytes.toBytes("testValue-1"));
826         txTable.put(tx1, p1);
827         tm.commit(tx1);
828 
829         // create the second hfile
830         manualFlush(TEST_TABLE);
831 
832         // Write yet another value transactionally
833         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
834         Put p2 = new Put(rowId);
835         p2.addColumn(fam, qual, Bytes.toBytes("testValue-2"));
836         txTable.put(tx2, p2);
837         tm.commit(tx2);
838 
839         // create a third hfile
840         manualFlush(TEST_TABLE);
841 
842         // Then perform a non-transactional Delete
843         Delete d = new Delete(rowId);
844         d.addColumn(fam, qual);
845         table.delete(d);
846 
847         // create the fourth hfile
848         manualFlush(TEST_TABLE);
849 
850         // Trigger the minor compaction
851         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
852         setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
853         admin.compact(TableName.valueOf(TEST_TABLE));
854         Thread.sleep(5000);
855 
856         // Then perform a non-tx (raw) scan...
857         Scan scan = new Scan();
858         scan.setRaw(true);
859         ResultScanner scannerResults = table.getScanner(scan);
860 
861         // ...and test the deleted cell is still there
862         int count = 0;
863         Result scanResult;
864         List<Cell> listOfCellsScanned = new ArrayList<>();
865         while ((scanResult = scannerResults.next()) != null) {
866             listOfCellsScanned = scanResult.listCells(); // equivalent to rawCells()
867             count++;
868         }
869         assertEquals(count, 1, "There should be only one result in scan results");
870         assertEquals(listOfCellsScanned.size(), 3, "There should be 3 cell entries in scan results (2 puts, 1 del)");
871         boolean wasDeletedCellFound = false;
872         int numberOfDeletedCellsFound = 0;
873         for (Cell cell : listOfCellsScanned) {
874             if (CellUtil.isDelete(cell)) {
875                 wasDeletedCellFound = true;
876                 numberOfDeletedCellsFound++;
877             }
878         }
879         assertTrue(wasDeletedCellFound, "We should have found a non-transactionally deleted cell");
880         assertEquals(numberOfDeletedCellsFound, 1, "There should be only only one deleted cell");
881 
882         table.close();
883     }
884 
885     /**
886      * Test that when a minor compaction runs, tombstones are not cleaned up
887      */
888     @Test(timeOut = 60_000)
889     public void testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs() throws Throwable {
890         String TEST_TABLE = "testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs";
891         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
892         TTable txTable = new TTable(connection, TEST_TABLE);
893 
894         // Configure the environment to create a minor compaction
895 
896         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
897         byte[] rowId = Bytes.toBytes("case1");
898         Put p = new Put(rowId);
899         p.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
900         txTable.put(tx0, p);
901         tm.commit(tx0);
902 
903         // create the first hfile
904         manualFlush(TEST_TABLE);
905 
906         // Create the tombstone
907         HBaseTransaction deleteTx = (HBaseTransaction) tm.begin();
908         Delete d = new Delete(rowId);
909         d.addColumn(fam, qual);
910         txTable.delete(deleteTx, d);
911         tm.commit(deleteTx);
912 
913         // create the second hfile
914         manualFlush(TEST_TABLE);
915 
916         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
917         Put p1 = new Put(rowId);
918         p1.addColumn(fam, qual, Bytes.toBytes("testValue-11"));
919         txTable.put(tx1, p1);
920         tm.commit(tx1);
921 
922         // create the third hfile
923         manualFlush(TEST_TABLE);
924 
925         HBaseTransaction lastTx = (HBaseTransaction) tm.begin();
926         Put p2 = new Put(rowId);
927         p2.addColumn(fam, qual, Bytes.toBytes("testValue-222"));
928         txTable.put(lastTx, p2);
929         tm.commit(lastTx);
930 
931         // Trigger the minor compaction
932         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
933         setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
934         admin.compact(TableName.valueOf(TEST_TABLE));
935         Thread.sleep(5000);
936 
937         // Checks on results after compaction
938         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
939         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx0.getStartTimestamp(), getter), "Put cell should be there");
940         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx0.getStartTimestamp(), getter),
941                     "Put shadow cell should be there");
942         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter), "Put cell should be there");
943         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
944                    "Put shadow cell should be there");
945         assertTrue(CellUtils.hasCell(rowId, fam, qual, deleteTx.getStartTimestamp(), getter),
946                    "Delete cell should be there");
947         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, deleteTx.getStartTimestamp(), getter),
948                    "Delete shadow cell should be there");
949         assertTrue(CellUtils.hasCell(rowId, fam, qual, lastTx.getStartTimestamp(), getter),
950                    "Put cell should be there");
951         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, lastTx.getStartTimestamp(), getter),
952                    "Put shadow cell should be there");
953     }
954 
955 
956     /**
957      * Test that when compaction runs, tombstones are cleaned up case1: 1 put (ts < lwm) then tombstone (ts > lwm)
958      */
959     @Test(timeOut = 60_000)
960     public void testTombstonesAreCleanedUpCase1() throws Exception {
961         String TEST_TABLE = "testTombstonesAreCleanedUpCase1";
962         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
963         TTable txTable = new TTable(connection, TEST_TABLE);
964 
965         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
966         byte[] rowId = Bytes.toBytes("case1");
967         Put p = new Put(rowId);
968         p.addColumn(fam, qual, Bytes.toBytes("testValue"));
969         txTable.put(tx1, p);
970         tm.commit(tx1);
971 
972         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
973         setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
974 
975         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
976         Delete d = new Delete(rowId);
977         d.addColumn(fam, qual);
978         txTable.delete(tx2, d);
979         tm.commit(tx2);
980 
981         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
982         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
983                    "Put cell should be there");
984         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
985                    "Put shadow cell should be there");
986         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
987                    "Delete cell should be there");
988         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
989                    "Delete shadow cell should be there");
990     }
991 
992     /**
993      * Test that when compaction runs, tombstones are cleaned up case2: 1 put (ts < lwm) then tombstone (ts < lwm)
994      */
995     @Test(timeOut = 60_000)
996     public void testTombstonesAreCleanedUpCase2() throws Exception {
997         String TEST_TABLE = "testTombstonesAreCleanedUpCase2";
998         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
999         TTable txTable = new TTable(connection, TEST_TABLE);
1000 
1001         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1002         byte[] rowId = Bytes.toBytes("case2");
1003         Put p = new Put(rowId);
1004         p.addColumn(fam, qual, Bytes.toBytes("testValue"));
1005         txTable.put(tx1, p);
1006         tm.commit(tx1);
1007 
1008         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1009         Delete d = new Delete(rowId);
1010         d.addColumn(fam, qual);
1011         txTable.delete(tx2, d);
1012         tm.commit(tx2);
1013 
1014         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1015         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1016 
1017         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1018         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1019                     "Put cell shouldn't be there");
1020         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1021                     "Put shadow cell shouldn't be there");
1022         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1023                     "Delete cell shouldn't be there");
1024         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1025                     "Delete shadow cell shouldn't be there");
1026     }
1027 
1028     /**
1029      * Test that when compaction runs, tombstones are cleaned up case3: 1 put (ts < lwm) then tombstone (ts < lwm) not
1030      * committed
1031      */
1032     @Test(timeOut = 60_000)
1033     public void testTombstonesAreCleanedUpCase3() throws Exception {
1034         String TEST_TABLE = "testTombstonesAreCleanedUpCase3";
1035         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1036         TTable txTable = new TTable(connection, TEST_TABLE);
1037 
1038         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1039         byte[] rowId = Bytes.toBytes("case3");
1040         Put p = new Put(rowId);
1041         p.addColumn(fam, qual, Bytes.toBytes("testValue"));
1042         txTable.put(tx1, p);
1043         tm.commit(tx1);
1044 
1045         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1046         Delete d = new Delete(rowId);
1047         d.addColumn(fam, qual);
1048         txTable.delete(tx2, d);
1049 
1050         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1051         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1052 
1053         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1054         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1055                    "Put cell should be there");
1056         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1057                    "Put shadow cell shouldn't be there");
1058         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1059                     "Delete cell shouldn't be there");
1060         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1061                     "Delete shadow cell shouldn't be there");
1062     }
1063 
1064     /**
1065      * Test that when compaction runs, tombstones are cleaned up case4: 1 put (ts < lwm) then tombstone (ts > lwm) not
1066      * committed
1067      */
1068     @Test(timeOut = 60_000)
1069     public void testTombstonesAreCleanedUpCase4() throws Exception {
1070         String TEST_TABLE = "testTombstonesAreCleanedUpCase4";
1071         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1072         TTable txTable = new TTable(connection, TEST_TABLE);
1073 
1074         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1075         byte[] rowId = Bytes.toBytes("case4");
1076         Put p = new Put(rowId);
1077         p.addColumn(fam, qual, Bytes.toBytes("testValue"));
1078         txTable.put(tx1, p);
1079         tm.commit(tx1);
1080 
1081         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1082 
1083         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1084         Delete d = new Delete(rowId);
1085         d.addColumn(fam, qual);
1086         txTable.delete(tx2, d);
1087         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1088 
1089         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1090         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1091                    "Put cell should be there");
1092         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1093                    "Put shadow cell shouldn't be there");
1094         assertTrue(CellUtils.hasCell(rowId, fam, qual,tx2.getStartTimestamp(), getter),
1095                    "Delete cell should be there");
1096         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1097                     "Delete shadow cell shouldn't be there");
1098     }
1099 
1100     /**
1101      * Test that when compaction runs, tombstones are cleaned up case5: tombstone (ts < lwm)
1102      */
1103     @Test(timeOut = 60_000)
1104     public void testTombstonesAreCleanedUpCase5() throws Exception {
1105         String TEST_TABLE = "testTombstonesAreCleanedUpCase5";
1106         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1107         TTable txTable = new TTable(connection, TEST_TABLE);
1108 
1109         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1110         byte[] rowId = Bytes.toBytes("case5");
1111         Delete d = new Delete(rowId);
1112         d.addColumn(fam, qual);
1113         txTable.delete(tx1, d);
1114         tm.commit(tx1);
1115 
1116         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1117         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1118 
1119         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1120         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1121                     "Delete cell shouldn't be there");
1122         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1123                     "Delete shadow cell shouldn't be there");
1124     }
1125 
1126     /**
1127      * Test that when compaction runs, tombstones are cleaned up case6: tombstone (ts < lwm), then put (ts < lwm)
1128      */
1129     @Test(timeOut = 60_000)
1130     public void testTombstonesAreCleanedUpCase6() throws Exception {
1131         String TEST_TABLE = "testTombstonesAreCleanedUpCase6";
1132         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1133         TTable txTable = new TTable(connection, TEST_TABLE);
1134         byte[] rowId = Bytes.toBytes("case6");
1135 
1136         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1137         Delete d = new Delete(rowId);
1138         d.addColumn(fam, qual);
1139         txTable.delete(tx1, d);
1140         tm.commit(tx1);
1141 
1142         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1143         Put p = new Put(rowId);
1144         p.addColumn(fam, qual, Bytes.toBytes("testValue"));
1145         txTable.put(tx2, p);
1146         tm.commit(tx2);
1147 
1148         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1149         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1150 
1151         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1152         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1153                     "Delete cell shouldn't be there");
1154         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1155                     "Delete shadow cell shouldn't be there");
1156         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1157                    "Put cell should be there");
1158         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1159                    "Put shadow cell shouldn't be there");
1160     }
1161 
1162     private void setCompactorLWM(long lwm, String tableName) throws Exception {
1163         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
1164                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
1165         CommitTable commitTable = injector.getInstance(CommitTable.class);
1166         CommitTable.Client commitTableClient = spy(commitTable.getClient());
1167         SettableFuture<Long> f = SettableFuture.create();
1168         f.set(lwm);
1169         doReturn(f).when(commitTableClient).readLowWatermark();
1170         omidCompactor.commitTableClientQueue.add(commitTableClient);
1171     }
1172 
1173     private void compactEverything(String tableName) throws Exception {
1174         compactWithLWM(Long.MAX_VALUE, tableName);
1175     }
1176 
1177     private void compactWithLWM(long lwm, String tableName) throws Exception {
1178         admin.flush(TableName.valueOf(tableName));
1179 
1180         LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
1181         setCompactorLWM(lwm, tableName);
1182         LOG.info("Compacting table {}", tableName);
1183         admin.majorCompact(TableName.valueOf(tableName));
1184 
1185         LOG.info("Sleeping for 3 secs");
1186         Thread.sleep(3000);
1187         LOG.info("Waking up after 3 secs");
1188     }
1189 
1190     private static long rowCount(String tableName, byte[] family) throws Throwable {
1191         Scan scan = new Scan();
1192         scan.addFamily(family);
1193         Table table = connection.getTable(TableName.valueOf(tableName));
1194         try (ResultScanner scanner = table.getScanner(scan)) {
1195             int count = 0;
1196             while (scanner.next() != null) {
1197                 count++;
1198             }
1199             return count;
1200         }
1201     }
1202 }