View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.util.concurrent.SettableFuture;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellUtil;
26  import org.apache.hadoop.hbase.HBaseTestingUtility;
27  import org.apache.hadoop.hbase.HColumnDescriptor;
28  import org.apache.hadoop.hbase.HTableDescriptor;
29  import org.apache.hadoop.hbase.MiniHBaseCluster;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.client.Delete;
32  import org.apache.hadoop.hbase.client.Get;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.client.HTableInterface;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.ResultScanner;
39  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
40  import org.apache.hadoop.hbase.client.Row;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
43  import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.omid.HBaseShims;
46  import org.apache.omid.TestUtils;
47  import org.apache.omid.committable.CommitTable;
48  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
49  import org.apache.omid.metrics.NullMetricsProvider;
50  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
51  import org.apache.omid.tso.TSOServer;
52  import org.apache.omid.tso.TSOServerConfig;
53  import org.mockito.invocation.InvocationOnMock;
54  import org.mockito.stubbing.Answer;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  import org.testng.annotations.AfterClass;
58  import org.testng.annotations.BeforeClass;
59  import org.testng.annotations.BeforeMethod;
60  import org.testng.annotations.Test;
61  
62  import java.io.IOException;
63  import java.util.ArrayList;
64  import java.util.List;
65  import java.util.Random;
66  import java.util.Set;
67  import java.util.concurrent.TimeUnit;
68  import java.util.concurrent.atomic.AtomicBoolean;
69  
70  import static org.mockito.Matchers.any;
71  import static org.mockito.Mockito.doAnswer;
72  import static org.mockito.Mockito.doReturn;
73  import static org.mockito.Mockito.doThrow;
74  import static org.mockito.Mockito.spy;
75  import static org.testng.Assert.assertEquals;
76  import static org.testng.Assert.assertFalse;
77  import static org.testng.Assert.assertNull;
78  import static org.testng.Assert.assertTrue;
79  import static org.testng.Assert.fail;
80  
81  public class TestCompaction {
82  
83      private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);
84  
85      private static final String TEST_FAMILY = "test-fam";
86      private static final String TEST_QUALIFIER = "test-qual";
87  
88      private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
89      private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
90      private final byte[] data = Bytes.toBytes("testWrite-1");
91  
92      private static final int MAX_VERSIONS = 3;
93  
94      private Random randomGenerator;
95      private AbstractTransactionManager tm;
96  
97      private Injector injector;
98  
99      private HBaseAdmin admin;
100     private Configuration hbaseConf;
101     private HBaseTestingUtility hbaseTestUtil;
102     private MiniHBaseCluster hbaseCluster;
103 
104     private TSOServer tso;
105 
106     private AggregationClient aggregationClient;
107     private CommitTable commitTable;
108     private PostCommitActions syncPostCommitter;
109 
110     @BeforeClass
111     public void setupTestCompation() throws Exception {
112         TSOServerConfig tsoConfig = new TSOServerConfig();
113         tsoConfig.setPort(1234);
114         tsoConfig.setMaxItems(1);
115         injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
116         hbaseConf = injector.getInstance(Configuration.class);
117         HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
118         HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
119 
120         // settings required for #testDuplicateDeletes()
121         hbaseConf.setInt("hbase.hstore.compaction.min", 2);
122         hbaseConf.setInt("hbase.hstore.compaction.max", 2);
123 
124         setupHBase();
125         aggregationClient = new AggregationClient(hbaseConf);
126         admin = new HBaseAdmin(hbaseConf);
127         createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
128         setupTSO();
129 
130         commitTable = injector.getInstance(CommitTable.class);
131     }
132 
133     private void setupHBase() throws Exception {
134         LOG.info("--------------------------------------------------------------------------------------------------");
135         LOG.info("Setting up HBase");
136         LOG.info("--------------------------------------------------------------------------------------------------");
137         hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
138         LOG.info("--------------------------------------------------------------------------------------------------");
139         LOG.info("Creating HBase MiniCluster");
140         LOG.info("--------------------------------------------------------------------------------------------------");
141         hbaseCluster = hbaseTestUtil.startMiniCluster(1);
142     }
143 
144     private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
145                                            HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
146         createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
147 
148         createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
149     }
150 
151     private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
152         if (!admin.tableExists(tableName)) {
153             LOG.info("Creating {} table...", tableName);
154             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
155 
156             for (byte[] family : families) {
157                 HColumnDescriptor datafam = new HColumnDescriptor(family);
158                 datafam.setMaxVersions(MAX_VERSIONS);
159                 desc.addFamily(datafam);
160             }
161 
162             desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
163             admin.createTable(desc);
164             for (byte[] family : families) {
165                 CompactorUtil.enableOmidCompaction(hbaseConf, TableName.valueOf(tableName), family);
166             }
167         }
168 
169     }
170 
171     private void setupTSO() throws IOException, InterruptedException {
172         tso = injector.getInstance(TSOServer.class);
173         tso.startAndWait();
174         TestUtils.waitForSocketListening("localhost", 1234, 100);
175         Thread.currentThread().setName("UnitTest(s) thread");
176     }
177 
178     @AfterClass
179     public void cleanupTestCompation() throws Exception {
180         teardownTSO();
181         hbaseCluster.shutdown();
182     }
183 
184     private void teardownTSO() throws IOException, InterruptedException {
185         tso.stopAndWait();
186         TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
187     }
188 
189     @BeforeMethod
190     public void setupTestCompactionIndividualTest() throws Exception {
191         randomGenerator = new Random(0xfeedcafeL);
192         tm = spy((AbstractTransactionManager) newTransactionManager());
193     }
194 
195     private TransactionManager newTransactionManager() throws Exception {
196         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
197         hbaseOmidClientConf.setConnectionString("localhost:1234");
198         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
199         CommitTable.Client commitTableClient = commitTable.getClient();
200         syncPostCommitter =
201                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
202         return HBaseTransactionManager.builder(hbaseOmidClientConf)
203                 .postCommitter(syncPostCommitter)
204                 .commitTableClient(commitTableClient)
205                 .build();
206     }
207 
208     @Test
209     public void testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction() throws Throwable {
210         String TEST_TABLE = "testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction";
211         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
212         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
213 
214         final int ROWS_TO_ADD = 5;
215 
216         long fakeAssignedLowWatermark = 0L;
217         for (int i = 0; i < ROWS_TO_ADD; ++i) {
218             long rowId = randomGenerator.nextLong();
219             Transaction tx = tm.begin();
220             if (i == (ROWS_TO_ADD / 2)) {
221                 fakeAssignedLowWatermark = tx.getTransactionId();
222                 LOG.info("AssignedLowWatermark " + fakeAssignedLowWatermark);
223             }
224             Put put = new Put(Bytes.toBytes(rowId));
225             put.add(fam, qual, data);
226             txTable.put(tx, put);
227             tm.commit(tx);
228         }
229 
230         LOG.info("Flushing table {}", TEST_TABLE);
231         admin.flush(TEST_TABLE);
232 
233         // Return a LWM that triggers compaction & stays between 1 and the max start timestamp assigned to previous TXs
234         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
235         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
236                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
237         CommitTable commitTable = injector.getInstance(CommitTable.class);
238         CommitTable.Client commitTableClient = spy(commitTable.getClient());
239         SettableFuture<Long> f = SettableFuture.create();
240         f.set(fakeAssignedLowWatermark);
241         doReturn(f).when(commitTableClient).readLowWatermark();
242         omidCompactor.commitTableClientQueue.add(commitTableClient);
243         LOG.info("Compacting table {}", TEST_TABLE);
244         admin.majorCompact(TEST_TABLE);
245 
246         LOG.info("Sleeping for 3 secs");
247         Thread.sleep(3000);
248         LOG.info("Waking up after 3 secs");
249 
250         // No rows should have been discarded after compacting
251         assertEquals(rowCount(TEST_TABLE, fam), ROWS_TO_ADD, "Rows in table after compacting should be " + ROWS_TO_ADD);
252     }
253 
254     @Test
255     public void testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction() throws Exception {
256         String TEST_TABLE = "testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction";
257         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
258         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
259 
260         // The following line emulates a crash after commit that is observed in (*) below
261         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
262 
263         HBaseTransaction problematicTx = (HBaseTransaction) tm.begin();
264 
265         long row = randomGenerator.nextLong();
266 
267         // Test shadow cell are created properly
268         Put put = new Put(Bytes.toBytes(row));
269         put.add(fam, qual, data);
270         txTable.put(problematicTx, put);
271         try {
272             tm.commit(problematicTx);
273         } catch (Exception e) { // (*) Crash
274             // Do nothing
275         }
276 
277         assertTrue(CellUtils.hasCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
278                                      new TTableCellGetterAdapter(txTable)),
279                    "Cell should be there");
280         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
281                                             new TTableCellGetterAdapter(txTable)),
282                     "Shadow cell should not be there");
283 
284         // Return a LWM that triggers compaction and has all the possible start timestamps below it
285         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
286         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
287                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
288         CommitTable commitTable = injector.getInstance(CommitTable.class);
289         CommitTable.Client commitTableClient = spy(commitTable.getClient());
290         SettableFuture<Long> f = SettableFuture.create();
291         f.set(Long.MAX_VALUE);
292         doReturn(f).when(commitTableClient).readLowWatermark();
293         omidCompactor.commitTableClientQueue.add(commitTableClient);
294 
295         LOG.info("Flushing table {}", TEST_TABLE);
296         admin.flush(TEST_TABLE);
297 
298         LOG.info("Compacting table {}", TEST_TABLE);
299         admin.majorCompact(TEST_TABLE);
300 
301         LOG.info("Sleeping for 3 secs");
302         Thread.sleep(3000);
303         LOG.info("Waking up after 3 secs");
304 
305         assertTrue(CellUtils.hasCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
306                                      new TTableCellGetterAdapter(txTable)),
307                    "Cell should be there");
308         assertTrue(CellUtils.hasShadowCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
309                                            new TTableCellGetterAdapter(txTable)),
310                    "Shadow cell should not be there");
311     }
312 
313     @Test
314     public void testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction()
315             throws Throwable {
316         String
317                 TEST_TABLE =
318                 "testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction";
319         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
320         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
321 
322         // The KV in this transaction should be discarded
323         HBaseTransaction neverendingTxBelowLowWatermark = (HBaseTransaction) tm.begin();
324         long rowId = randomGenerator.nextLong();
325         Put put = new Put(Bytes.toBytes(rowId));
326         put.add(fam, qual, data);
327         txTable.put(neverendingTxBelowLowWatermark, put);
328         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
329                                      new TTableCellGetterAdapter(txTable)),
330                    "Cell should be there");
331         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
332                                             new TTableCellGetterAdapter(txTable)),
333                     "Shadow cell should not be there");
334 
335         // The KV in this transaction should be added without the shadow cells
336         HBaseTransaction neverendingTxAboveLowWatermark = (HBaseTransaction) tm.begin();
337         long anotherRowId = randomGenerator.nextLong();
338         put = new Put(Bytes.toBytes(anotherRowId));
339         put.add(fam, qual, data);
340         txTable.put(neverendingTxAboveLowWatermark, put);
341         assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
342                                      new TTableCellGetterAdapter(txTable)),
343                    "Cell should be there");
344         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
345                                             new TTableCellGetterAdapter(txTable)),
346                     "Shadow cell should not be there");
347 
348         assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table before flushing should be 2");
349         LOG.info("Flushing table {}", TEST_TABLE);
350         admin.flush(TEST_TABLE);
351         assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table after flushing should be 2");
352 
353         // Return a LWM that triggers compaction and stays between both ST of TXs, so assign 1st TX's start timestamp
354         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
355         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
356                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
357         CommitTable commitTable = injector.getInstance(CommitTable.class);
358         CommitTable.Client commitTableClient = spy(commitTable.getClient());
359         SettableFuture<Long> f = SettableFuture.create();
360         f.set(neverendingTxBelowLowWatermark.getStartTimestamp());
361         doReturn(f).when(commitTableClient).readLowWatermark();
362         omidCompactor.commitTableClientQueue.add(commitTableClient);
363         LOG.info("Compacting table {}", TEST_TABLE);
364         admin.majorCompact(TEST_TABLE);
365 
366         LOG.info("Sleeping for 3 secs");
367         Thread.sleep(3000);
368         LOG.info("Waking up after 3 secs");
369 
370         // One row should have been discarded after compacting
371         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
372         // The row from the TX below the LWM should not be there (nor its Shadow Cell)
373         assertFalse(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
374                                       new TTableCellGetterAdapter(txTable)),
375                     "Cell should not be there");
376         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
377                                             new TTableCellGetterAdapter(txTable)),
378                     "Shadow cell should not be there");
379         // The row from the TX above the LWM should be there without the Shadow Cell
380         assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
381                                      new TTableCellGetterAdapter(txTable)),
382                    "Cell should be there");
383         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
384                                             new TTableCellGetterAdapter(txTable)),
385                     "Shadow cell should not be there");
386 
387     }
388 
389     @Test
390     public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
391         String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReached";
392         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
393         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
394 
395         // The KV in this transaction should be discarded but in the end should remain there because
396         // the commit table won't be accessed (simulating an error on access)
397         HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
398         long rowId = randomGenerator.nextLong();
399         Put put = new Put(Bytes.toBytes(rowId));
400         put.add(fam, qual, data);
401         txTable.put(neverendingTx, put);
402         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
403                                      new TTableCellGetterAdapter(txTable)),
404                    "Cell should be there");
405         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
406                                             new TTableCellGetterAdapter(txTable)),
407                     "Shadow cell should not be there");
408 
409         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
410         LOG.info("Flushing table {}", TEST_TABLE);
411         admin.flush(TEST_TABLE);
412         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
413 
414         // Break access to CommitTable functionality in Compactor
415         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
416         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
417                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
418         CommitTable commitTable = injector.getInstance(CommitTable.class);
419         CommitTable.Client commitTableClient = spy(commitTable.getClient());
420         SettableFuture<Long> f = SettableFuture.create();
421         f.setException(new IOException("Unable to read"));
422         doReturn(f).when(commitTableClient).readLowWatermark();
423         omidCompactor.commitTableClientQueue.add(commitTableClient);
424 
425         LOG.info("Compacting table {}", TEST_TABLE);
426         admin.majorCompact(TEST_TABLE); // Should trigger the error when accessing CommitTable funct.
427 
428         LOG.info("Sleeping for 3 secs");
429         Thread.sleep(3000);
430         LOG.info("Waking up after 3 secs");
431 
432         // All rows should be there after the failed compaction
433         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
434         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
435                                      new TTableCellGetterAdapter(txTable)),
436                    "Cell should be there");
437         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
438                                             new TTableCellGetterAdapter(txTable)),
439                     "Shadow cell should not be there");
440     }
441 
442     @Test
443     public void testOriginalTableParametersAreAvoidedAlsoWhenCompacting() throws Throwable {
444         String TEST_TABLE = "testOriginalTableParametersAreAvoidedAlsoWhenCompacting";
445         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
446         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
447 
448         long rowId = randomGenerator.nextLong();
449         for (int versionCount = 0; versionCount <= (2 * MAX_VERSIONS); versionCount++) {
450             Transaction tx = tm.begin();
451             Put put = new Put(Bytes.toBytes(rowId));
452             put.add(fam, qual, Bytes.toBytes("testWrite-" + versionCount));
453             txTable.put(tx, put);
454             tm.commit(tx);
455         }
456 
457         Transaction tx = tm.begin();
458         Get get = new Get(Bytes.toBytes(rowId));
459         get.setMaxVersions(2 * MAX_VERSIONS);
460         assertEquals(get.getMaxVersions(), (2 * MAX_VERSIONS), "Max versions should be set to " + (2 * MAX_VERSIONS));
461         get.addColumn(fam, qual);
462         Result result = txTable.get(tx, get);
463         tm.commit(tx);
464         List<Cell> column = result.getColumnCells(fam, qual);
465         assertEquals(column.size(), 1, "There should be only one version in the result");
466 
467         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table before flushing");
468         LOG.info("Flushing table {}", TEST_TABLE);
469         admin.flush(TEST_TABLE);
470         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after flushing");
471 
472         // Return a LWM that triggers compaction
473         compactEverything(TEST_TABLE);
474 
475         // One row should have been discarded after compacting
476         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
477 
478         tx = tm.begin();
479         get = new Get(Bytes.toBytes(rowId));
480         get.setMaxVersions(2 * MAX_VERSIONS);
481         assertEquals(get.getMaxVersions(), (2 * MAX_VERSIONS), "Max versions should be set to " + (2 * MAX_VERSIONS));
482         get.addColumn(fam, qual);
483         result = txTable.get(tx, get);
484         tm.commit(tx);
485         column = result.getColumnCells(fam, qual);
486         assertEquals(column.size(), 1, "There should be only one version in the result");
487         assertEquals(Bytes.toString(CellUtil.cloneValue(column.get(0))), "testWrite-" + (2 * MAX_VERSIONS),
488                      "Values don't match");
489     }
490 
491     // manually flush the regions on the region server.
492     // flushing like this prevents compaction running
493     // directly after the flush, which we want to avoid.
494     private void manualFlush(String tableName) throws Throwable {
495         LOG.info("Manually flushing all regions and waiting 2 secs");
496         HBaseShims.flushAllOnlineRegions(hbaseTestUtil.getHBaseCluster().getRegionServer(0),
497                                          TableName.valueOf(tableName));
498         TimeUnit.SECONDS.sleep(2);
499     }
500 
501     @Test
502     public void testOldCellsAreDiscardedAfterCompaction() throws Exception {
503         String TEST_TABLE = "testOldCellsAreDiscardedAfterCompaction";
504         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
505         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
506 
507         byte[] rowId = Bytes.toBytes("row");
508 
509         // Create 3 transactions modifying the same cell in a particular row
510         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
511         Put put1 = new Put(rowId);
512         put1.add(fam, qual, Bytes.toBytes("testValue 1"));
513         txTable.put(tx1, put1);
514         tm.commit(tx1);
515 
516         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
517         Put put2 = new Put(rowId);
518         put2.add(fam, qual, Bytes.toBytes("testValue 2"));
519         txTable.put(tx2, put2);
520         tm.commit(tx2);
521 
522         HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
523         Put put3 = new Put(rowId);
524         put3.add(fam, qual, Bytes.toBytes("testValue 3"));
525         txTable.put(tx3, put3);
526         tm.commit(tx3);
527 
528         // Before compaction, the three timestamped values for the cell should be there
529         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
530         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
531                    "Put cell of Tx1 should be there");
532         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
533                    "Put shadow cell of Tx1 should be there");
534         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
535                    "Put cell of Tx2 cell should be there");
536         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
537                    "Put shadow cell of Tx2 should be there");
538         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
539                    "Put cell of Tx3 cell should be there");
540         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
541                    "Put shadow cell of Tx3 should be there");
542 
543         // Compact
544         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
545         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
546 
547         // After compaction, only the last value for the cell should have survived
548         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
549                     "Put cell of Tx1 should not be there");
550         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
551                     "Put shadow cell of Tx1 should not be there");
552         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
553                     "Put cell of Tx2 should not be there");
554         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
555                     "Put shadow cell of Tx2 should not be there");
556         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
557                    "Put cell of Tx3 cell should be there");
558         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
559                    "Put shadow cell of Tx3 should be there");
560 
561         // A new transaction after compaction should read the last value written
562         HBaseTransaction newTx1 = (HBaseTransaction) tm.begin();
563         Get newGet1 = new Get(rowId);
564         newGet1.addColumn(fam, qual);
565         Result result = txTable.get(newTx1, newGet1);
566         assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
567         // Write a new value
568         Put newPut1 = new Put(rowId);
569         newPut1.add(fam, qual, Bytes.toBytes("new testValue 1"));
570         txTable.put(newTx1, newPut1);
571 
572         // Start a second new transaction
573         HBaseTransaction newTx2 = (HBaseTransaction) tm.begin();
574         // Commit first of the new tx
575         tm.commit(newTx1);
576 
577         // The second transaction should still read the previous value
578         Get newGet2 = new Get(rowId);
579         newGet2.addColumn(fam, qual);
580         result = txTable.get(newTx2, newGet2);
581         assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
582         tm.commit(newTx2);
583 
584         // Only two values -the new written by newTx1 and the last value
585         // for the cell after compaction- should have survived
586         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
587                     "Put cell of Tx1 should not be there");
588         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
589                     "Put shadow cell of Tx1 should not be there");
590         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
591                     "Put cell of Tx2 should not be there");
592         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
593                     "Put shadow cell of Tx2 should not be there");
594         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
595                    "Put cell of Tx3 cell should be there");
596         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
597                    "Put shadow cell of Tx3 should be there");
598         assertTrue(CellUtils.hasCell(rowId, fam, qual, newTx1.getStartTimestamp(), getter),
599                    "Put cell of NewTx1 cell should be there");
600         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, newTx1.getStartTimestamp(), getter),
601                    "Put shadow cell of NewTx1 should be there");
602     }
603 
604     /**
605      * Tests a case where a temporary failure to flush causes the compactor to crash
606      */
607     @Test
608     public void testDuplicateDeletes() throws Throwable {
609         String TEST_TABLE = "testDuplicateDeletes";
610         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
611         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
612 
613         // jump through hoops to trigger a minor compaction.
614         // a minor compaction will only run if there are enough
615         // files to be compacted, but that is less than the number
616         // of total files, in which case it will run a major
617         // compaction. The issue this is testing only shows up
618         // with minor compaction, as only Deletes can be duplicate
619         // and major compactions filter them out.
620         byte[] firstRow = "FirstRow".getBytes();
621         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
622         Put put0 = new Put(firstRow);
623         put0.add(fam, qual, Bytes.toBytes("testWrite-1"));
624         txTable.put(tx0, put0);
625         tm.commit(tx0);
626 
627         // create the first hfile
628         manualFlush(TEST_TABLE);
629 
630         // write a row, it won't be committed
631         byte[] rowToBeCompactedAway = "compactMe".getBytes();
632         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
633         Put put1 = new Put(rowToBeCompactedAway);
634         put1.add(fam, qual, Bytes.toBytes("testWrite-1"));
635         txTable.put(tx1, put1);
636         txTable.flushCommits();
637 
638         // write a row to trigger the double delete problem
639         byte[] row = "iCauseErrors".getBytes();
640         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
641         Put put2 = new Put(row);
642         put2.add(fam, qual, Bytes.toBytes("testWrite-1"));
643         txTable.put(tx2, put2);
644         tm.commit(tx2);
645 
646         HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
647         Put put3 = new Put(row);
648         put3.add(fam, qual, Bytes.toBytes("testWrite-1"));
649         txTable.put(tx3, put3);
650         txTable.flushCommits();
651 
652         // cause a failure on HBaseTM#preCommit();
653         Set<HBaseCellId> writeSet = tx3.getWriteSet();
654         assertEquals(1, writeSet.size());
655         List<HBaseCellId> newWriteSet = new ArrayList<>();
656         final AtomicBoolean flushFailing = new AtomicBoolean(true);
657         for (HBaseCellId id : writeSet) {
658             HTableInterface failableHTable = spy(id.getTable());
659             doAnswer(new Answer<Void>() {
660                 @Override
661                 public Void answer(InvocationOnMock invocation)
662                         throws Throwable {
663                     if (flushFailing.get()) {
664                         throw new RetriesExhaustedWithDetailsException(new ArrayList<Throwable>(),
665                                                                        new ArrayList<Row>(), new ArrayList<String>());
666                     } else {
667                         invocation.callRealMethod();
668                     }
669                     return null;
670                 }
671             }).when(failableHTable).flushCommits();
672 
673             newWriteSet.add(new HBaseCellId(failableHTable,
674                                             id.getRow(), id.getFamily(),
675                                             id.getQualifier(), id.getTimestamp()));
676         }
677         writeSet.clear();
678         writeSet.addAll(newWriteSet);
679 
680         try {
681             tm.commit(tx3);
682             fail("Shouldn't succeed");
683         } catch (TransactionException tme) {
684             flushFailing.set(false);
685             tm.rollback(tx3);
686         }
687 
688         // create second hfile,
689         // it should contain multiple deletes
690         manualFlush(TEST_TABLE);
691 
692         // create loads of files
693         byte[] anotherRow = "someotherrow".getBytes();
694         HBaseTransaction tx4 = (HBaseTransaction) tm.begin();
695         Put put4 = new Put(anotherRow);
696         put4.add(fam, qual, Bytes.toBytes("testWrite-1"));
697         txTable.put(tx4, put4);
698         tm.commit(tx4);
699 
700         // create third hfile
701         manualFlush(TEST_TABLE);
702 
703         // trigger minor compaction and give it time to run
704         setCompactorLWM(tx4.getStartTimestamp(), TEST_TABLE);
705         admin.compact(TEST_TABLE);
706         Thread.sleep(3000);
707 
708         // check if the cell that should be compacted, is compacted
709         assertFalse(CellUtils.hasCell(rowToBeCompactedAway, fam, qual, tx1.getStartTimestamp(),
710                                       new TTableCellGetterAdapter(txTable)),
711                     "Cell should not be be there");
712     }
713 
714     @Test(timeOut = 60000)
715     public void testNonOmidCFIsUntouched() throws Throwable {
716         String TEST_TABLE = "testNonOmidCFIsUntouched";
717         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
718         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
719 
720         admin.disableTable(TEST_TABLE);
721         byte[] nonOmidCF = Bytes.toBytes("nonOmidCF");
722         byte[] nonOmidQual = Bytes.toBytes("nonOmidCol");
723         HColumnDescriptor nonomidfam = new HColumnDescriptor(nonOmidCF);
724         nonomidfam.setMaxVersions(MAX_VERSIONS);
725         admin.addColumn(TEST_TABLE, nonomidfam);
726         admin.enableTable(TEST_TABLE);
727 
728         byte[] rowId = Bytes.toBytes("testRow");
729         Transaction tx = tm.begin();
730         Put put = new Put(rowId);
731         put.add(fam, qual, Bytes.toBytes("testValue"));
732         txTable.put(tx, put);
733 
734         Put nonTxPut = new Put(rowId);
735         nonTxPut.add(nonOmidCF, nonOmidQual, Bytes.toBytes("nonTxVal"));
736         txTable.getHTable().put(nonTxPut);
737         txTable.flushCommits(); // to make sure it left the client
738 
739         Get g = new Get(rowId);
740         Result result = txTable.getHTable().get(g);
741         assertEquals(result.getColumnCells(nonOmidCF, nonOmidQual).size(), 1, "Should be there, precompact");
742         assertEquals(result.getColumnCells(fam, qual).size(), 1, "Should be there, precompact");
743 
744         compactEverything(TEST_TABLE);
745 
746         result = txTable.getHTable().get(g);
747         assertEquals(result.getColumnCells(nonOmidCF, nonOmidQual).size(), 1, "Should be there, postcompact");
748         assertEquals(result.getColumnCells(fam, qual).size(), 0, "Should not be there, postcompact");
749     }
750 
751     // ----------------------------------------------------------------------------------------------------------------
752     // Tests on tombstones and non-transactional Deletes
753     // ----------------------------------------------------------------------------------------------------------------
754 
755     /**
756      * Test that when a major compaction runs, cells that were deleted non-transactionally dissapear
757      */
758     @Test(timeOut = 60_000)
759     public void testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs() throws Throwable {
760         String TEST_TABLE = "testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs";
761         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
762         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
763 
764         HTable table = new HTable(hbaseConf, TEST_TABLE);
765 
766         // Write first a value transactionally
767         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
768         byte[] rowId = Bytes.toBytes("row1");
769         Put p0 = new Put(rowId);
770         p0.add(fam, qual, Bytes.toBytes("testValue-0"));
771         txTable.put(tx0, p0);
772         tm.commit(tx0);
773 
774         // Then perform a non-transactional Delete
775         Delete d = new Delete(rowId);
776         d.deleteColumn(fam, qual);
777         table.delete(d);
778 
779         // Trigger a major compaction
780         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
781         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
782 
783         // Then perform a non-tx (raw) scan...
784         Scan scan = new Scan();
785         scan.setRaw(true);
786         ResultScanner scannerResults = table.getScanner(scan);
787 
788         // ...and test the deleted cell is not there anymore
789         assertNull(scannerResults.next(), "There should be no results in scan results");
790 
791         table.close();
792 
793     }
794 
795     /**
796      * Test that when a minor compaction runs, cells that were deleted non-transactionally are preserved. This is to
797      * allow users still access the cells when doing "improper" operations on a transactional table
798      */
799     @Test(timeOut = 60_000)
800     public void testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs() throws Throwable {
801         String TEST_TABLE = "testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs";
802         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
803         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
804 
805         HTable table = new HTable(hbaseConf, TEST_TABLE);
806 
807         // Configure the environment to create a minor compaction
808 
809         // Write first a value transactionally
810         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
811         byte[] rowId = Bytes.toBytes("row1");
812         Put p0 = new Put(rowId);
813         p0.add(fam, qual, Bytes.toBytes("testValue-0"));
814         txTable.put(tx0, p0);
815         tm.commit(tx0);
816 
817         // create the first hfile
818         manualFlush(TEST_TABLE);
819 
820         // Write another value transactionally
821         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
822         Put p1 = new Put(rowId);
823         p1.add(fam, qual, Bytes.toBytes("testValue-1"));
824         txTable.put(tx1, p1);
825         tm.commit(tx1);
826 
827         // create the second hfile
828         manualFlush(TEST_TABLE);
829 
830         // Write yet another value transactionally
831         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
832         Put p2 = new Put(rowId);
833         p2.add(fam, qual, Bytes.toBytes("testValue-2"));
834         txTable.put(tx2, p2);
835         tm.commit(tx2);
836 
837         // create a third hfile
838         manualFlush(TEST_TABLE);
839 
840         // Then perform a non-transactional Delete
841         Delete d = new Delete(rowId);
842         d.deleteColumn(fam, qual);
843         table.delete(d);
844 
845         // create the fourth hfile
846         manualFlush(TEST_TABLE);
847 
848         // Trigger the minor compaction
849         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
850         setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
851         admin.compact(TEST_TABLE);
852         Thread.sleep(5000);
853 
854         // Then perform a non-tx (raw) scan...
855         Scan scan = new Scan();
856         scan.setRaw(true);
857         ResultScanner scannerResults = table.getScanner(scan);
858 
859         // ...and test the deleted cell is still there
860         int count = 0;
861         Result scanResult;
862         List<Cell> listOfCellsScanned = new ArrayList<>();
863         while ((scanResult = scannerResults.next()) != null) {
864             listOfCellsScanned = scanResult.listCells(); // equivalent to rawCells()
865             count++;
866         }
867         assertEquals(count, 1, "There should be only one result in scan results");
868         assertEquals(listOfCellsScanned.size(), 3, "There should be 3 cell entries in scan results (2 puts, 1 del)");
869         boolean wasDeletedCellFound = false;
870         int numberOfDeletedCellsFound = 0;
871         for (Cell cell : listOfCellsScanned) {
872             if (CellUtil.isDelete(cell)) {
873                 wasDeletedCellFound = true;
874                 numberOfDeletedCellsFound++;
875             }
876         }
877         assertTrue(wasDeletedCellFound, "We should have found a non-transactionally deleted cell");
878         assertEquals(numberOfDeletedCellsFound, 1, "There should be only only one deleted cell");
879 
880         table.close();
881     }
882 
883     /**
884      * Test that when a minor compaction runs, tombstones are not cleaned up
885      */
886     @Test(timeOut = 60_000)
887     public void testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs() throws Throwable {
888         String TEST_TABLE = "testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs";
889         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
890         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
891 
892         // Configure the environment to create a minor compaction
893 
894         HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
895         byte[] rowId = Bytes.toBytes("case1");
896         Put p = new Put(rowId);
897         p.add(fam, qual, Bytes.toBytes("testValue-0"));
898         txTable.put(tx0, p);
899         tm.commit(tx0);
900 
901         // create the first hfile
902         manualFlush(TEST_TABLE);
903 
904         // Create the tombstone
905         HBaseTransaction deleteTx = (HBaseTransaction) tm.begin();
906         Delete d = new Delete(rowId);
907         d.deleteColumn(fam, qual);
908         txTable.delete(deleteTx, d);
909         tm.commit(deleteTx);
910 
911         // create the second hfile
912         manualFlush(TEST_TABLE);
913 
914         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
915         Put p1 = new Put(rowId);
916         p1.add(fam, qual, Bytes.toBytes("testValue-11"));
917         txTable.put(tx1, p1);
918         tm.commit(tx1);
919 
920         // create the third hfile
921         manualFlush(TEST_TABLE);
922 
923         HBaseTransaction lastTx = (HBaseTransaction) tm.begin();
924         Put p2 = new Put(rowId);
925         p2.add(fam, qual, Bytes.toBytes("testValue-222"));
926         txTable.put(lastTx, p2);
927         tm.commit(lastTx);
928 
929         // Trigger the minor compaction
930         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
931         setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
932         admin.compact(TEST_TABLE);
933         Thread.sleep(5000);
934 
935         // Checks on results after compaction
936         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
937         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx0.getStartTimestamp(), getter), "Put cell should be there");
938         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx0.getStartTimestamp(), getter),
939                     "Put shadow cell should be there");
940         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter), "Put cell should be there");
941         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
942                    "Put shadow cell should be there");
943         assertTrue(CellUtils.hasCell(rowId, fam, qual, deleteTx.getStartTimestamp(), getter),
944                    "Delete cell should be there");
945         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, deleteTx.getStartTimestamp(), getter),
946                    "Delete shadow cell should be there");
947         assertTrue(CellUtils.hasCell(rowId, fam, qual, lastTx.getStartTimestamp(), getter),
948                    "Put cell should be there");
949         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, lastTx.getStartTimestamp(), getter),
950                    "Put shadow cell should be there");
951     }
952 
953 
954     /**
955      * Test that when compaction runs, tombstones are cleaned up case1: 1 put (ts < lwm) then tombstone (ts > lwm)
956      */
957     @Test(timeOut = 60_000)
958     public void testTombstonesAreCleanedUpCase1() throws Exception {
959         String TEST_TABLE = "testTombstonesAreCleanedUpCase1";
960         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
961         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
962 
963         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
964         byte[] rowId = Bytes.toBytes("case1");
965         Put p = new Put(rowId);
966         p.add(fam, qual, Bytes.toBytes("testValue"));
967         txTable.put(tx1, p);
968         tm.commit(tx1);
969 
970         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
971         setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
972 
973         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
974         Delete d = new Delete(rowId);
975         d.deleteColumn(fam, qual);
976         txTable.delete(tx2, d);
977         tm.commit(tx2);
978 
979         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
980         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
981                    "Put cell should be there");
982         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
983                    "Put shadow cell should be there");
984         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
985                    "Delete cell should be there");
986         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
987                    "Delete shadow cell should be there");
988     }
989 
990     /**
991      * Test that when compaction runs, tombstones are cleaned up case2: 1 put (ts < lwm) then tombstone (ts < lwm)
992      */
993     @Test(timeOut = 60_000)
994     public void testTombstonesAreCleanedUpCase2() throws Exception {
995         String TEST_TABLE = "testTombstonesAreCleanedUpCase2";
996         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
997         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
998 
999         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1000         byte[] rowId = Bytes.toBytes("case2");
1001         Put p = new Put(rowId);
1002         p.add(fam, qual, Bytes.toBytes("testValue"));
1003         txTable.put(tx1, p);
1004         tm.commit(tx1);
1005 
1006         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1007         Delete d = new Delete(rowId);
1008         d.deleteColumn(fam, qual);
1009         txTable.delete(tx2, d);
1010         tm.commit(tx2);
1011 
1012         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1013         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1014 
1015         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1016         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1017                     "Put cell shouldn't be there");
1018         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1019                     "Put shadow cell shouldn't be there");
1020         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1021                     "Delete cell shouldn't be there");
1022         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1023                     "Delete shadow cell shouldn't be there");
1024     }
1025 
1026     /**
1027      * Test that when compaction runs, tombstones are cleaned up case3: 1 put (ts < lwm) then tombstone (ts < lwm) not
1028      * committed
1029      */
1030     @Test(timeOut = 60_000)
1031     public void testTombstonesAreCleanedUpCase3() throws Exception {
1032         String TEST_TABLE = "testTombstonesAreCleanedUpCase3";
1033         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1034         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
1035 
1036         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1037         byte[] rowId = Bytes.toBytes("case3");
1038         Put p = new Put(rowId);
1039         p.add(fam, qual, Bytes.toBytes("testValue"));
1040         txTable.put(tx1, p);
1041         tm.commit(tx1);
1042 
1043         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1044         Delete d = new Delete(rowId);
1045         d.deleteColumn(fam, qual);
1046         txTable.delete(tx2, d);
1047 
1048         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1049         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1050 
1051         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1052         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1053                    "Put cell should be there");
1054         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1055                    "Put shadow cell shouldn't be there");
1056         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1057                     "Delete cell shouldn't be there");
1058         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1059                     "Delete shadow cell shouldn't be there");
1060     }
1061 
1062     /**
1063      * Test that when compaction runs, tombstones are cleaned up case4: 1 put (ts < lwm) then tombstone (ts > lwm) not
1064      * committed
1065      */
1066     @Test(timeOut = 60_000)
1067     public void testTombstonesAreCleanedUpCase4() throws Exception {
1068         String TEST_TABLE = "testTombstonesAreCleanedUpCase4";
1069         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1070         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
1071 
1072         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1073         byte[] rowId = Bytes.toBytes("case4");
1074         Put p = new Put(rowId);
1075         p.add(fam, qual, Bytes.toBytes("testValue"));
1076         txTable.put(tx1, p);
1077         tm.commit(tx1);
1078 
1079         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1080 
1081         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1082         Delete d = new Delete(rowId);
1083         d.deleteColumn(fam, qual);
1084         txTable.delete(tx2, d);
1085         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1086 
1087         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1088         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1089                    "Put cell should be there");
1090         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1091                    "Put shadow cell shouldn't be there");
1092         assertTrue(CellUtils.hasCell(rowId, fam, qual,tx2.getStartTimestamp(), getter),
1093                    "Delete cell should be there");
1094         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1095                     "Delete shadow cell shouldn't be there");
1096     }
1097 
1098     /**
1099      * Test that when compaction runs, tombstones are cleaned up case5: tombstone (ts < lwm)
1100      */
1101     @Test(timeOut = 60_000)
1102     public void testTombstonesAreCleanedUpCase5() throws Exception {
1103         String TEST_TABLE = "testTombstonesAreCleanedUpCase5";
1104         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1105         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
1106 
1107         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1108         byte[] rowId = Bytes.toBytes("case5");
1109         Delete d = new Delete(rowId);
1110         d.deleteColumn(fam, qual);
1111         txTable.delete(tx1, d);
1112         tm.commit(tx1);
1113 
1114         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1115         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1116 
1117         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1118         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1119                     "Delete cell shouldn't be there");
1120         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1121                     "Delete shadow cell shouldn't be there");
1122     }
1123 
1124     /**
1125      * Test that when compaction runs, tombstones are cleaned up case6: tombstone (ts < lwm), then put (ts < lwm)
1126      */
1127     @Test(timeOut = 60_000)
1128     public void testTombstonesAreCleanedUpCase6() throws Exception {
1129         String TEST_TABLE = "testTombstonesAreCleanedUpCase6";
1130         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
1131         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
1132         byte[] rowId = Bytes.toBytes("case6");
1133 
1134         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
1135         Delete d = new Delete(rowId);
1136         d.deleteColumn(fam, qual);
1137         txTable.delete(tx1, d);
1138         tm.commit(tx1);
1139 
1140         HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
1141         Put p = new Put(rowId);
1142         p.add(fam, qual, Bytes.toBytes("testValue"));
1143         txTable.put(tx2, p);
1144         tm.commit(tx2);
1145 
1146         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
1147         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
1148 
1149         TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
1150         assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1151                     "Delete cell shouldn't be there");
1152         assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
1153                     "Delete shadow cell shouldn't be there");
1154         assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1155                    "Put cell should be there");
1156         assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
1157                    "Put shadow cell shouldn't be there");
1158     }
1159 
1160     private void setCompactorLWM(long lwm, String tableName) throws Exception {
1161         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
1162                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
1163         CommitTable commitTable = injector.getInstance(CommitTable.class);
1164         CommitTable.Client commitTableClient = spy(commitTable.getClient());
1165         SettableFuture<Long> f = SettableFuture.create();
1166         f.set(lwm);
1167         doReturn(f).when(commitTableClient).readLowWatermark();
1168         omidCompactor.commitTableClientQueue.add(commitTableClient);
1169     }
1170 
1171     private void compactEverything(String tableName) throws Exception {
1172         compactWithLWM(Long.MAX_VALUE, tableName);
1173     }
1174 
1175     private void compactWithLWM(long lwm, String tableName) throws Exception {
1176         admin.flush(tableName);
1177 
1178         LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
1179         setCompactorLWM(lwm, tableName);
1180         LOG.info("Compacting table {}", tableName);
1181         admin.majorCompact(tableName);
1182 
1183         LOG.info("Sleeping for 3 secs");
1184         Thread.sleep(3000);
1185         LOG.info("Waking up after 3 secs");
1186     }
1187 
1188     private long rowCount(String tableName, byte[] family) throws Throwable {
1189         Scan scan = new Scan();
1190         scan.addFamily(family);
1191         return aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
1192     }
1193 
1194 }