View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable.hbase;
19  
20  import com.google.common.base.Optional;
21  import com.google.common.util.concurrent.ListenableFuture;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.hbase.HBaseConfiguration;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.HColumnDescriptor;
26  import org.apache.hadoop.hbase.HTableDescriptor;
27  import org.apache.hadoop.hbase.MiniHBaseCluster;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.client.HBaseAdmin;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
32  import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
33  import org.apache.omid.committable.CommitTable;
34  import org.apache.omid.committable.CommitTable.Client;
35  import org.apache.omid.committable.CommitTable.CommitTimestamp;
36  import org.apache.omid.committable.CommitTable.Writer;
37  import org.apache.omid.committable.hbase.HBaseCommitTable.HBaseClient;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  import org.testng.Assert;
41  import org.testng.annotations.AfterClass;
42  import org.testng.annotations.AfterMethod;
43  import org.testng.annotations.BeforeClass;
44  import org.testng.annotations.BeforeMethod;
45  import org.testng.annotations.Test;
46  
47  import java.util.concurrent.ExecutionException;
48  import java.util.concurrent.Future;
49  
50  import static org.testng.Assert.assertEquals;
51  import static org.testng.Assert.assertFalse;
52  import static org.testng.Assert.assertTrue;
53  
54  public class TestHBaseCommitTable {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(TestHBaseCommitTable.class);
57  
58      private static final String TEST_TABLE = "TEST";
59  
60      private static final TableName TABLE_NAME = TableName.valueOf(TEST_TABLE);
61  
62      private static HBaseTestingUtility testutil;
63      private static MiniHBaseCluster hbasecluster;
64      protected static Configuration hbaseConf;
65      private static AggregationClient aggregationClient;
66      private byte[] commitTableFamily;
67      private byte[] lowWatermarkFamily;
68  
69  
70      @BeforeClass
71      public void setUpClass() throws Exception {
72          // HBase setup
73          hbaseConf = HBaseConfiguration.create();
74          DefaultHBaseCommitTableStorageModule module = new DefaultHBaseCommitTableStorageModule();
75          commitTableFamily = module.getFamilyName().getBytes();
76          lowWatermarkFamily = module.getLowWatermarkFamily().getBytes();
77          LOG.info("Create hbase");
78          testutil = new HBaseTestingUtility(hbaseConf);
79          hbasecluster = testutil.startMiniCluster(1);
80          aggregationClient = new AggregationClient(hbaseConf);
81  
82      }
83  
84      @AfterClass
85      public void tearDownClass() throws Exception {
86          if (hbasecluster != null) {
87              testutil.shutdownMiniCluster();
88          }
89      }
90  
91      @BeforeMethod
92      public void setUp() throws Exception {
93          HBaseAdmin admin = testutil.getHBaseAdmin();
94  
95          if (!admin.tableExists(TEST_TABLE)) {
96              HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
97  
98              HColumnDescriptor datafam = new HColumnDescriptor(commitTableFamily);
99              datafam.setMaxVersions(Integer.MAX_VALUE);
100             desc.addFamily(datafam);
101 
102             HColumnDescriptor lowWatermarkFam = new HColumnDescriptor(lowWatermarkFamily);
103             lowWatermarkFam.setMaxVersions(Integer.MAX_VALUE);
104             desc.addFamily(lowWatermarkFam);
105 
106             desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
107             admin.createTable(desc);
108         }
109 
110         if (admin.isTableDisabled(TEST_TABLE)) {
111             admin.enableTable(TEST_TABLE);
112         }
113         HTableDescriptor[] tables = admin.listTables();
114         for (HTableDescriptor t : tables) {
115             LOG.info(t.getNameAsString());
116         }
117     }
118 
119     @AfterMethod
120     public void tearDown() {
121         try {
122             LOG.info("tearing Down");
123             HBaseAdmin admin = testutil.getHBaseAdmin();
124             admin.disableTable(TEST_TABLE);
125             admin.deleteTable(TEST_TABLE);
126 
127         } catch (Exception e) {
128             LOG.error("Error tearing down", e);
129         }
130     }
131 
132     @Test(timeOut = 30_000)
133     public void testBasicBehaviour() throws Throwable {
134         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
135         config.setTableName(TEST_TABLE);
136         HBaseCommitTable commitTable = new HBaseCommitTable(hbaseConf, config);
137 
138         Writer writer = commitTable.getWriter();
139         Client client = commitTable.getClient();
140 
141         // Test that the first time the table is empty
142         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
143 
144         // Test the successful creation of 1000 txs in the table
145         for (int i = 0; i < 1000; i++) {
146             writer.addCommittedTransaction(i, i + 1);
147         }
148         writer.flush();
149         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000, "Rows should be 1000!");
150 
151         // Test the we get the right commit timestamps for each previously inserted tx
152         for (long i = 0; i < 1000; i++) {
153             Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(i).get();
154             assertTrue(commitTimestamp.isPresent());
155             assertTrue(commitTimestamp.get().isValid());
156             long ct = commitTimestamp.get().getValue();
157             assertEquals(ct, (i + 1), "Commit timestamp should be " + (i + 1));
158         }
159         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000, "Rows should be 1000!");
160 
161         // Test the successful deletion of the 1000 txs
162         Future<Void> f;
163         for (long i = 0; i < 1000; i++) {
164             f = client.completeTransaction(i);
165             f.get();
166         }
167         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
168 
169         // Test we don't get a commit timestamp for a non-existent transaction id in the table
170         Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(0).get();
171         assertFalse(commitTimestamp.isPresent(), "Commit timestamp should not be present");
172 
173         // Test that the first time, the low watermark family in table is empty
174         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 0, "Rows should be 0!");
175 
176         // Test the unsuccessful read of the low watermark the first time
177         ListenableFuture<Long> lowWatermarkFuture = client.readLowWatermark();
178         assertEquals(lowWatermarkFuture.get(), Long.valueOf(0), "Low watermark should be 0");
179 
180         // Test the successful update of the low watermark
181         for (int lowWatermark = 0; lowWatermark < 1000; lowWatermark++) {
182             writer.updateLowWatermark(lowWatermark);
183         }
184         writer.flush();
185         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
186 
187         // Test the successful read of the low watermark
188         lowWatermarkFuture = client.readLowWatermark();
189         long lowWatermark = lowWatermarkFuture.get();
190         assertEquals(lowWatermark, 999, "Low watermark should be 999");
191         assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
192 
193     }
194 
195     @Test(timeOut = 30_000)
196     public void testTransactionInvalidation() throws Throwable {
197 
198         // Prepare test
199         final int TX1_ST = 1;
200         final int TX1_CT = 2;
201         final int TX2_ST = 11;
202         final int TX2_CT = 12;
203 
204         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
205         config.setTableName(TEST_TABLE);
206         HBaseCommitTable commitTable = new HBaseCommitTable(hbaseConf, config);
207 
208         // Components under test
209         Writer writer = commitTable.getWriter();
210         Client client = commitTable.getClient();
211 
212         // Test that initially the table is empty
213         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
214 
215         // Test that a transaction can be added properly to the commit table
216         writer.addCommittedTransaction(TX1_ST, TX1_CT);
217         writer.flush();
218         Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
219         assertTrue(commitTimestamp.isPresent());
220         assertTrue(commitTimestamp.get().isValid());
221         long ct = commitTimestamp.get().getValue();
222         assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
223 
224         // Test that a committed transaction cannot be invalidated and
225         // preserves its commit timestamp after that
226         boolean wasInvalidated = client.tryInvalidateTransaction(TX1_ST).get();
227         assertFalse(wasInvalidated, "Transaction should not be invalidated");
228 
229         commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
230         assertTrue(commitTimestamp.isPresent());
231         assertTrue(commitTimestamp.get().isValid());
232         ct = commitTimestamp.get().getValue();
233         assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
234 
235         // Test that a non-committed transaction can be invalidated...
236         wasInvalidated = client.tryInvalidateTransaction(TX2_ST).get();
237         assertTrue(wasInvalidated, "Transaction should be invalidated");
238         commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
239         assertTrue(commitTimestamp.isPresent());
240         assertFalse(commitTimestamp.get().isValid());
241         ct = commitTimestamp.get().getValue();
242         assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
243                      "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
244         // ...and that if it has been already invalidated, it remains
245         // invalidated when someone tries to commit it
246         writer.addCommittedTransaction(TX2_ST, TX2_CT);
247         writer.flush();
248         commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
249         assertTrue(commitTimestamp.isPresent());
250         assertFalse(commitTimestamp.get().isValid());
251         ct = commitTimestamp.get().getValue();
252         assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
253                      "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
254 
255         // Test that at the end of the test, the commit table contains 2
256         // elements, which correspond to the two rows added in the test
257         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 2, "Rows should be 2!");
258 
259     }
260 
261     @Test(timeOut = 30_000)
262     public void testClosingClientEmptyQueuesProperly() throws Throwable {
263         HBaseCommitTableConfig config = new HBaseCommitTableConfig();
264         config.setTableName(TEST_TABLE);
265         HBaseCommitTable commitTable = new HBaseCommitTable(hbaseConf, config);
266 
267         Writer writer = commitTable.getWriter();
268         HBaseCommitTable.HBaseClient client = (HBaseClient) commitTable.getClient();
269 
270         for (int i = 0; i < 1000; i++) {
271             writer.addCommittedTransaction(i, i + 1);
272         }
273         writer.flush();
274 
275         // Completing first transaction should be fine
276         client.completeTransaction(0).get();
277         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
278 
279         // When closing, removing a transaction should throw an EE with an IOException
280         client.close();
281         try {
282             client.completeTransaction(1).get();
283             Assert.fail();
284         } catch (ExecutionException e) {
285             // Expected
286         }
287         assertEquals(client.deleteQueue.size(), 0, "Delete queue size should be 0!");
288         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
289 
290     }
291 
292     private static long rowCount(TableName table, byte[] family) throws Throwable {
293         Scan scan = new Scan();
294         scan.addFamily(family);
295         return aggregationClient.rowCount(table, new LongColumnInterpreter(), scan);
296     }
297 
298 }