View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable.hbase;
19  
20  import static org.testng.Assert.assertEquals;
21  import static org.testng.Assert.assertFalse;
22  import static org.testng.Assert.assertTrue;
23  
24  import java.util.concurrent.Future;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HColumnDescriptor;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.MiniHBaseCluster;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.omid.committable.CommitTable;
40  import org.apache.omid.committable.CommitTable.Client;
41  import org.apache.omid.committable.CommitTable.CommitTimestamp;
42  import org.apache.omid.committable.CommitTable.Writer;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  import org.testng.annotations.AfterClass;
46  import org.testng.annotations.AfterMethod;
47  import org.testng.annotations.BeforeClass;
48  import org.testng.annotations.BeforeMethod;
49  import org.testng.annotations.Test;
50  
51  import com.google.common.base.Optional;
52  import com.google.common.util.concurrent.ListenableFuture;
53  
54  public class TestHBaseCommitTable {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(TestHBaseCommitTable.class);
57  
58      private static final String TEST_TABLE = "TEST";
59  
60      private static final TableName TABLE_NAME = TableName.valueOf(TEST_TABLE);
61  
62      private static HBaseTestingUtility testutil;
63      private static MiniHBaseCluster hbasecluster;
64      protected static Configuration hbaseConf;
65      protected static Connection connection;
66      private byte[] commitTableFamily;
67      private byte[] lowWatermarkFamily;
68  
69  
70      @BeforeClass
71      public void setUpClass() throws Exception {
72          // HBase setup
73          hbaseConf = HBaseConfiguration.create();
74          hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
75          DefaultHBaseCommitTableStorageModule module = new DefaultHBaseCommitTableStorageModule();
76          commitTableFamily = module.getFamilyName().getBytes();
77          lowWatermarkFamily = module.getLowWatermarkFamily().getBytes();
78          LOG.info("Create hbase");
79          testutil = new HBaseTestingUtility(hbaseConf);
80          hbasecluster = testutil.startMiniCluster(1);
81          connection = ConnectionFactory.createConnection(hbaseConf);
82      }
83  
84      @AfterClass
85      public void tearDownClass() throws Exception {
86          if (hbasecluster != null) {
87              testutil.shutdownMiniCluster();
88          }
89      }
90  
91      @BeforeMethod
92      public void setUp() throws Exception {
93          Admin admin = testutil.getHBaseAdmin();
94  
95          if (!admin.tableExists(TableName.valueOf(TEST_TABLE))) {
96              HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
97  
98              HColumnDescriptor datafam = new HColumnDescriptor(commitTableFamily);
99              datafam.setMaxVersions(Integer.MAX_VALUE);
100             desc.addFamily(datafam);
101 
102             HColumnDescriptor lowWatermarkFam = new HColumnDescriptor(lowWatermarkFamily);
103             lowWatermarkFam.setMaxVersions(Integer.MAX_VALUE);
104             desc.addFamily(lowWatermarkFam);
105 
106             // Move to HBaseSims for 2.0 support
107             // For 2.0, use TableDescriptorBuilder to build TableDescriptor
108             admin.createTable(desc);
109         }
110 
111         if (admin.isTableDisabled(TableName.valueOf(TEST_TABLE))) {
112             admin.enableTable(TableName.valueOf(TEST_TABLE));
113         }
114         HTableDescriptor[] tables = admin.listTables();
115         for (HTableDescriptor t : tables) {
116             LOG.info(t.getNameAsString());
117         }
118     }
119 
120     @AfterMethod
121     public void tearDown() {
122         try {
123             LOG.info("tearing Down");
124             Admin admin = testutil.getHBaseAdmin();
125             admin.disableTable(TableName.valueOf(TEST_TABLE));
126             admin.deleteTable(TableName.valueOf(TEST_TABLE));
127 
128         } catch (Exception e) {
129             LOG.error("Error tearing down", e);
130         }
131     }
132 
133     @Test(timeOut = 30_000)
134     public void testBasicBehaviour() throws Throwable {
135         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
136         config.setTableName(TEST_TABLE);
137         HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
138 
139         Writer writer = commitTable.getWriter();
140         Client client = commitTable.getClient();
141 
142         // Test that the first time the table is empty
143         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
144 
145         // Test the successful creation of 1000 txs in the table
146         for (int i = 0; i < 1000; i+=CommitTable.MAX_CHECKPOINTS_PER_TXN) {
147             writer.addCommittedTransaction(i, i + 1);
148         }
149         writer.flush();
150         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000/CommitTable.MAX_CHECKPOINTS_PER_TXN, "Rows should be 1000!");
151 
152         // Test the we get the right commit timestamps for each previously inserted tx
153         for (long i = 0; i < 1000; i++) {
154             Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(i).get();
155             assertTrue(commitTimestamp.isPresent());
156             assertTrue(commitTimestamp.get().isValid());
157             long ct = commitTimestamp.get().getValue();
158             long expected = i - (i % CommitTable.MAX_CHECKPOINTS_PER_TXN) + 1;
159             assertEquals(ct, expected, "Commit timestamp should be " + expected);
160         }
161         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000/CommitTable.MAX_CHECKPOINTS_PER_TXN, "Rows should be 1000!");
162 
163         // Test the successful deletion of the 1000 txs
164         Future<Void> f;
165         for (long i = 0; i < 1000; i+=CommitTable.MAX_CHECKPOINTS_PER_TXN) {
166             f = client.deleteCommitEntry(i);
167             f.get();
168         }
169         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
170 
171         // Test we don't get a commit timestamp for a non-existent transaction id in the table
172         Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(0).get();
173         assertFalse(commitTimestamp.isPresent(), "Commit timestamp should not be present");
174 
175         // Test that the first time, the low watermark family in table is empty
176         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 0, "Rows should be 0!");
177 
178         // Test the unsuccessful read of the low watermark the first time
179         ListenableFuture<Long> lowWatermarkFuture = client.readLowWatermark();
180         assertEquals(lowWatermarkFuture.get(), Long.valueOf(0), "Low watermark should be 0");
181 
182         // Test the successful update of the low watermark
183         for (int lowWatermark = 0; lowWatermark < 1000; lowWatermark++) {
184             writer.updateLowWatermark(lowWatermark);
185         }
186         writer.flush();
187         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
188 
189         // Test the successful read of the low watermark
190         lowWatermarkFuture = client.readLowWatermark();
191         long lowWatermark = lowWatermarkFuture.get();
192         assertEquals(lowWatermark, 999, "Low watermark should be 999");
193         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
194 
195     }
196 
197 
198     @Test(timeOut = 30_000)
199     public void testCheckpoints() throws Throwable {
200         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
201         config.setTableName(TEST_TABLE);
202         HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
203 
204         Writer writer = commitTable.getWriter();
205         Client client = commitTable.getClient();
206 
207         // Test that the first time the table is empty
208         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
209 
210         long st = 0;
211         long ct = 1;
212 
213         // Add a single commit that may represent many checkpoints
214         writer.addCommittedTransaction(st, ct);
215         writer.flush();
216 
217         for (int i=0; i<CommitTable.MAX_CHECKPOINTS_PER_TXN;++i) {
218             Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(i).get();
219             assertTrue(commitTimestamp.isPresent());
220             assertTrue(commitTimestamp.get().isValid());
221             assertEquals(ct, commitTimestamp.get().getValue());
222         }
223 
224         // try invalidate based on start timestamp from a checkpoint
225         assertFalse(client.tryInvalidateTransaction(st + 1).get());
226 
227         long st2 = 100;
228         long ct2 = 101;
229 
230         // now invalidate a not committed transaction and then commit
231         assertTrue(client.tryInvalidateTransaction(st2 + 1).get());
232         assertFalse(writer.atomicAddCommittedTransaction(st2, ct2));
233 
234         //test delete
235         client.deleteCommitEntry(st2 + 1).get();
236         //now committing should work
237         assertTrue(writer.atomicAddCommittedTransaction(st2, ct2));
238     }
239 
240 
241 
242     @Test(timeOut = 30_000)
243     public void testTransactionInvalidation() throws Throwable {
244 
245         // Prepare test
246         final int TX1_ST = 0;
247         final int TX1_CT = TX1_ST + 1;
248         final int TX2_ST = 0 + CommitTable.MAX_CHECKPOINTS_PER_TXN;
249         final int TX2_CT = TX2_ST + 1;
250 
251         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
252         config.setTableName(TEST_TABLE);
253         HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
254 
255         // Components under test
256         Writer writer = commitTable.getWriter();
257         Client client = commitTable.getClient();
258 
259         // Test that initially the table is empty
260         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
261 
262         // Test that a transaction can be added properly to the commit table
263         writer.addCommittedTransaction(TX1_ST, TX1_CT);
264         writer.flush();
265         Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
266         assertTrue(commitTimestamp.isPresent());
267         assertTrue(commitTimestamp.get().isValid());
268         long ct = commitTimestamp.get().getValue();
269         assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
270 
271         // Test that a committed transaction cannot be invalidated and
272         // preserves its commit timestamp after that
273         boolean wasInvalidated = client.tryInvalidateTransaction(TX1_ST).get();
274         assertFalse(wasInvalidated, "Transaction should not be invalidated");
275 
276         commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
277         assertTrue(commitTimestamp.isPresent());
278         assertTrue(commitTimestamp.get().isValid());
279         ct = commitTimestamp.get().getValue();
280         assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
281 
282         // Test that a non-committed transaction can be invalidated...
283         wasInvalidated = client.tryInvalidateTransaction(TX2_ST).get();
284         assertTrue(wasInvalidated, "Transaction should be invalidated");
285         commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
286         assertTrue(commitTimestamp.isPresent());
287         assertFalse(commitTimestamp.get().isValid());
288         ct = commitTimestamp.get().getValue();
289         assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
290                      "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
291         // ...and that if it has been already invalidated, it remains
292         // invalidated when someone tries to commit it
293         writer.addCommittedTransaction(TX2_ST, TX2_CT);
294         writer.flush();
295         commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
296         assertTrue(commitTimestamp.isPresent());
297         assertFalse(commitTimestamp.get().isValid());
298         ct = commitTimestamp.get().getValue();
299         assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
300                      "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
301 
302         // Test that at the end of the test, the commit table contains 2
303         // elements, which correspond to the two rows added in the test
304         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 2, "Rows should be 2!");
305 
306     }
307 
308     private static long rowCount(TableName tableName, byte[] family) throws Throwable {
309         Scan scan = new Scan();
310         scan.addFamily(family);
311         Table table = connection.getTable(tableName);
312         try (ResultScanner scanner = table.getScanner(scan)) {
313             int count = 0;
314             while (scanner.next() != null) {
315                 count++;
316             }
317             return count;
318         }
319     }
320 
321 }