View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable.hbase;
19  
20  import static org.testng.Assert.assertEquals;
21  import static org.testng.Assert.assertFalse;
22  import static org.testng.Assert.assertTrue;
23  
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.Future;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.MiniHBaseCluster;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.client.Admin;
35  import org.apache.hadoop.hbase.client.Connection;
36  import org.apache.hadoop.hbase.client.ConnectionFactory;
37  import org.apache.hadoop.hbase.client.ResultScanner;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.client.Table;
40  import org.apache.omid.committable.CommitTable;
41  import org.apache.omid.committable.CommitTable.Client;
42  import org.apache.omid.committable.CommitTable.CommitTimestamp;
43  import org.apache.omid.committable.CommitTable.Writer;
44  import org.apache.omid.committable.hbase.HBaseCommitTable.HBaseClient;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  import org.testng.Assert;
48  import org.testng.annotations.AfterClass;
49  import org.testng.annotations.AfterMethod;
50  import org.testng.annotations.BeforeClass;
51  import org.testng.annotations.BeforeMethod;
52  import org.testng.annotations.Test;
53  
54  import com.google.common.base.Optional;
55  import com.google.common.util.concurrent.ListenableFuture;
56  
57  public class TestHBaseCommitTable {
58  
59      private static final Logger LOG = LoggerFactory.getLogger(TestHBaseCommitTable.class);
60  
61      private static final String TEST_TABLE = "TEST";
62  
63      private static final TableName TABLE_NAME = TableName.valueOf(TEST_TABLE);
64  
65      private static HBaseTestingUtility testutil;
66      private static MiniHBaseCluster hbasecluster;
67      protected static Configuration hbaseConf;
68      protected static Connection connection;
69      private byte[] commitTableFamily;
70      private byte[] lowWatermarkFamily;
71  
72  
73      @BeforeClass
74      public void setUpClass() throws Exception {
75          // HBase setup
76          hbaseConf = HBaseConfiguration.create();
77          hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
78          DefaultHBaseCommitTableStorageModule module = new DefaultHBaseCommitTableStorageModule();
79          commitTableFamily = module.getFamilyName().getBytes();
80          lowWatermarkFamily = module.getLowWatermarkFamily().getBytes();
81          LOG.info("Create hbase");
82          testutil = new HBaseTestingUtility(hbaseConf);
83          hbasecluster = testutil.startMiniCluster(1);
84          connection = ConnectionFactory.createConnection(hbaseConf);
85      }
86  
87      @AfterClass
88      public void tearDownClass() throws Exception {
89          if (hbasecluster != null) {
90              testutil.shutdownMiniCluster();
91          }
92      }
93  
94      @BeforeMethod
95      public void setUp() throws Exception {
96          Admin admin = testutil.getHBaseAdmin();
97  
98          if (!admin.tableExists(TableName.valueOf(TEST_TABLE))) {
99              HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
100 
101             HColumnDescriptor datafam = new HColumnDescriptor(commitTableFamily);
102             datafam.setMaxVersions(Integer.MAX_VALUE);
103             desc.addFamily(datafam);
104 
105             HColumnDescriptor lowWatermarkFam = new HColumnDescriptor(lowWatermarkFamily);
106             lowWatermarkFam.setMaxVersions(Integer.MAX_VALUE);
107             desc.addFamily(lowWatermarkFam);
108 
109             // Move to HBaseSims for 2.0 support
110             // For 2.0, use TableDescriptorBuilder to build TableDescriptor
111             admin.createTable(desc);
112         }
113 
114         if (admin.isTableDisabled(TableName.valueOf(TEST_TABLE))) {
115             admin.enableTable(TableName.valueOf(TEST_TABLE));
116         }
117         HTableDescriptor[] tables = admin.listTables();
118         for (HTableDescriptor t : tables) {
119             LOG.info(t.getNameAsString());
120         }
121     }
122 
123     @AfterMethod
124     public void tearDown() {
125         try {
126             LOG.info("tearing Down");
127             Admin admin = testutil.getHBaseAdmin();
128             admin.disableTable(TableName.valueOf(TEST_TABLE));
129             admin.deleteTable(TableName.valueOf(TEST_TABLE));
130 
131         } catch (Exception e) {
132             LOG.error("Error tearing down", e);
133         }
134     }
135 
136     @Test(timeOut = 30_000)
137     public void testBasicBehaviour() throws Throwable {
138         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
139         config.setTableName(TEST_TABLE);
140         HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
141 
142         Writer writer = commitTable.getWriter();
143         Client client = commitTable.getClient();
144 
145         // Test that the first time the table is empty
146         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
147 
148         // Test the successful creation of 1000 txs in the table
149         for (int i = 0; i < 1000; i++) {
150             writer.addCommittedTransaction(i, i + 1);
151         }
152         writer.flush();
153         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000, "Rows should be 1000!");
154 
155         // Test the we get the right commit timestamps for each previously inserted tx
156         for (long i = 0; i < 1000; i++) {
157             Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(i).get();
158             assertTrue(commitTimestamp.isPresent());
159             assertTrue(commitTimestamp.get().isValid());
160             long ct = commitTimestamp.get().getValue();
161             assertEquals(ct, (i + 1), "Commit timestamp should be " + (i + 1));
162         }
163         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000, "Rows should be 1000!");
164 
165         // Test the successful deletion of the 1000 txs
166         Future<Void> f;
167         for (long i = 0; i < 1000; i++) {
168             f = client.completeTransaction(i);
169             f.get();
170         }
171         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
172 
173         // Test we don't get a commit timestamp for a non-existent transaction id in the table
174         Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(0).get();
175         assertFalse(commitTimestamp.isPresent(), "Commit timestamp should not be present");
176 
177         // Test that the first time, the low watermark family in table is empty
178         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 0, "Rows should be 0!");
179 
180         // Test the unsuccessful read of the low watermark the first time
181         ListenableFuture<Long> lowWatermarkFuture = client.readLowWatermark();
182         assertEquals(lowWatermarkFuture.get(), Long.valueOf(0), "Low watermark should be 0");
183 
184         // Test the successful update of the low watermark
185         for (int lowWatermark = 0; lowWatermark < 1000; lowWatermark++) {
186             writer.updateLowWatermark(lowWatermark);
187         }
188         writer.flush();
189         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
190 
191         // Test the successful read of the low watermark
192         lowWatermarkFuture = client.readLowWatermark();
193         long lowWatermark = lowWatermarkFuture.get();
194         assertEquals(lowWatermark, 999, "Low watermark should be 999");
195         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
196 
197     }
198 
199     @Test(timeOut = 30_000)
200     public void testTransactionInvalidation() throws Throwable {
201 
202         // Prepare test
203         final int TX1_ST = 1;
204         final int TX1_CT = 2;
205         final int TX2_ST = 11;
206         final int TX2_CT = 12;
207 
208         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
209         config.setTableName(TEST_TABLE);
210         HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
211 
212         // Components under test
213         Writer writer = commitTable.getWriter();
214         Client client = commitTable.getClient();
215 
216         // Test that initially the table is empty
217         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
218 
219         // Test that a transaction can be added properly to the commit table
220         writer.addCommittedTransaction(TX1_ST, TX1_CT);
221         writer.flush();
222         Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
223         assertTrue(commitTimestamp.isPresent());
224         assertTrue(commitTimestamp.get().isValid());
225         long ct = commitTimestamp.get().getValue();
226         assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
227 
228         // Test that a committed transaction cannot be invalidated and
229         // preserves its commit timestamp after that
230         boolean wasInvalidated = client.tryInvalidateTransaction(TX1_ST).get();
231         assertFalse(wasInvalidated, "Transaction should not be invalidated");
232 
233         commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
234         assertTrue(commitTimestamp.isPresent());
235         assertTrue(commitTimestamp.get().isValid());
236         ct = commitTimestamp.get().getValue();
237         assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
238 
239         // Test that a non-committed transaction can be invalidated...
240         wasInvalidated = client.tryInvalidateTransaction(TX2_ST).get();
241         assertTrue(wasInvalidated, "Transaction should be invalidated");
242         commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
243         assertTrue(commitTimestamp.isPresent());
244         assertFalse(commitTimestamp.get().isValid());
245         ct = commitTimestamp.get().getValue();
246         assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
247                      "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
248         // ...and that if it has been already invalidated, it remains
249         // invalidated when someone tries to commit it
250         writer.addCommittedTransaction(TX2_ST, TX2_CT);
251         writer.flush();
252         commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
253         assertTrue(commitTimestamp.isPresent());
254         assertFalse(commitTimestamp.get().isValid());
255         ct = commitTimestamp.get().getValue();
256         assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
257                      "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
258 
259         // Test that at the end of the test, the commit table contains 2
260         // elements, which correspond to the two rows added in the test
261         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 2, "Rows should be 2!");
262 
263     }
264 
265     @Test(timeOut = 30_000)
266     public void testClosingClientEmptyQueuesProperly() throws Throwable {
267         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
268         config.setTableName(TEST_TABLE);
269         HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
270 
271         Writer writer = commitTable.getWriter();
272         HBaseCommitTable.HBaseClient client = (HBaseClient) commitTable.getClient();
273 
274         for (int i = 0; i < 1000; i++) {
275             writer.addCommittedTransaction(i, i + 1);
276         }
277         writer.flush();
278 
279         // Completing first transaction should be fine
280         client.completeTransaction(0).get();
281         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
282 
283         // When closing, removing a transaction should throw an EE with an IOException
284         client.close();
285         try {
286             client.completeTransaction(1).get();
287             Assert.fail();
288         } catch (ExecutionException e) {
289             // Expected
290         }
291         assertEquals(client.deleteQueue.size(), 0, "Delete queue size should be 0!");
292         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
293 
294     }
295 
296     private static long rowCount(TableName tableName, byte[] family) throws Throwable {
297         Scan scan = new Scan();
298         scan.addFamily(family);
299         Table table = connection.getTable(tableName);
300         try (ResultScanner scanner = table.getScanner(scan)) {
301             int count = 0;
302             while (scanner.next() != null) {
303                 count++;
304             }
305             return count;
306         }
307     }
308 
309 }