View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.inject.Guice;
21  import com.google.inject.Injector;
22  import org.apache.omid.TestUtils;
23  import org.apache.omid.committable.CommitTable;
24  import org.apache.omid.committable.InMemoryCommitTable;
25  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
26  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
27  import org.apache.omid.tools.hbase.OmidTableManager;
28  import org.apache.omid.tso.TSOMockModule;
29  import org.apache.omid.tso.TSOServer;
30  import org.apache.omid.tso.TSOServerConfig;
31  import org.apache.omid.tso.client.OmidClientConfiguration;
32  import org.apache.omid.tso.client.TSOClient;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.HBaseConfiguration;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.MiniHBaseCluster;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HBaseAdmin;
44  import org.apache.hadoop.hbase.client.HTable;
45  import org.apache.hadoop.hbase.client.Result;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  import org.testng.ITestContext;
50  import org.testng.annotations.AfterGroups;
51  import org.testng.annotations.AfterMethod;
52  import org.testng.annotations.BeforeGroups;
53  import org.testng.annotations.BeforeMethod;
54  
55  import java.io.File;
56  import java.io.IOException;
57  import java.lang.reflect.Method;
58  
59  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
60  
61  public abstract class OmidTestBase {
62  
63      private static final Logger LOG = LoggerFactory.getLogger(OmidTestBase.class);
64  
65      static HBaseTestingUtility hBaseUtils;
66      private static MiniHBaseCluster hbaseCluster;
67      static Configuration hbaseConf;
68  
69      protected static final String TEST_TABLE = "test";
70      protected static final String TEST_FAMILY = "data";
71      static final String TEST_FAMILY2 = "data2";
72      private HBaseCommitTableConfig hBaseCommitTableConfig;
73  
74      @BeforeMethod(alwaysRun = true)
75      public void beforeClass(Method method) throws Exception {
76          Thread.currentThread().setName("UnitTest-" + method.getName());
77      }
78  
79  
80      @BeforeGroups(groups = "sharedHBase")
81      public void beforeGroups(ITestContext context) throws Exception {
82          // TSO Setup
83          TSOServerConfig tsoConfig = new TSOServerConfig();
84          tsoConfig.setPort(1234);
85          tsoConfig.setMaxItems(1000);
86          Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
87          LOG.info("Starting TSO");
88          TSOServer tso = injector.getInstance(TSOServer.class);
89          hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
90          HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
91          tso.startAndWait();
92          TestUtils.waitForSocketListening("localhost", 1234, 100);
93          LOG.info("Finished loading TSO");
94          context.setAttribute("tso", tso);
95  
96          OmidClientConfiguration clientConf = new OmidClientConfiguration();
97          clientConf.setConnectionString("localhost:1234");
98          context.setAttribute("clientConf", clientConf);
99  
100         InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
101         context.setAttribute("commitTable", commitTable);
102 
103         // Create the associated Handler
104         TSOClient client = TSOClient.newInstance(clientConf);
105         context.setAttribute("client", client);
106 
107         // ------------------------------------------------------------------------------------------------------------
108         // HBase setup
109         // ------------------------------------------------------------------------------------------------------------
110         LOG.info("Creating HBase minicluster");
111         hbaseConf = HBaseConfiguration.create();
112         hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
113         hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
114         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
115 
116         File tempFile = File.createTempFile("OmidTest", "");
117         tempFile.deleteOnExit();
118         hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
119 
120         hBaseUtils = new HBaseTestingUtility(hbaseConf);
121         hbaseCluster = hBaseUtils.startMiniCluster(1);
122         hBaseUtils.createTable(Bytes.toBytes(hBaseTimestampStorageConfig.getTableName()),
123                                new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
124                                Integer.MAX_VALUE);
125 
126         createTestTable();
127         createCommitTable();
128 
129         LOG.info("HBase minicluster is up");
130     }
131 
132     private void createTestTable() throws IOException {
133         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
134         HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
135         HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
136         HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
137         datafam.setMaxVersions(Integer.MAX_VALUE);
138         datafam2.setMaxVersions(Integer.MAX_VALUE);
139         test_table_desc.addFamily(datafam);
140         test_table_desc.addFamily(datafam2);
141         admin.createTable(test_table_desc);
142     }
143 
144     private void createCommitTable() throws IOException {
145         String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
146         OmidTableManager omidTableManager = new OmidTableManager(args);
147         omidTableManager.executeActionsOnHBase(hbaseConf);
148     }
149 
150 
151     private TSOServer getTSO(ITestContext context) {
152         return (TSOServer) context.getAttribute("tso");
153     }
154 
155 
156     TSOClient getClient(ITestContext context) {
157         return (TSOClient) context.getAttribute("client");
158     }
159 
160     InMemoryCommitTable getCommitTable(ITestContext context) {
161         return (InMemoryCommitTable) context.getAttribute("commitTable");
162     }
163 
164     protected TransactionManager newTransactionManager(ITestContext context) throws Exception {
165         return newTransactionManager(context, getClient(context));
166     }
167 
168     protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception {
169         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
170         clientConf.setConnectionString("localhost:1234");
171         clientConf.setHBaseConfiguration(hbaseConf);
172         return HBaseTransactionManager.builder(clientConf)
173                 .postCommitter(postCommitActions)
174                 .commitTableClient(getCommitTable(context).getClient())
175                 .tsoClient(getClient(context)).build();
176     }
177 
178     protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception {
179         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
180         clientConf.setConnectionString("localhost:1234");
181         clientConf.setHBaseConfiguration(hbaseConf);
182         return HBaseTransactionManager.builder(clientConf)
183                 .commitTableClient(getCommitTable(context).getClient())
184                 .tsoClient(tsoClient).build();
185     }
186 
187     protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient)
188             throws Exception {
189         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
190         clientConf.setConnectionString("localhost:1234");
191         clientConf.setHBaseConfiguration(hbaseConf);
192         return HBaseTransactionManager.builder(clientConf)
193                 .commitTableClient(commitTableClient)
194                 .tsoClient(getClient(context)).build();
195     }
196 
197     @AfterGroups(groups = "sharedHBase")
198     public void afterGroups(ITestContext context) throws Exception {
199         LOG.info("Tearing down OmidTestBase...");
200         if (hbaseCluster != null) {
201             hBaseUtils.shutdownMiniCluster();
202         }
203 
204         getClient(context).close().get();
205         getTSO(context).stopAndWait();
206         TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
207     }
208 
209     @AfterMethod(groups = "sharedHBase", timeOut = 60_000)
210     public void afterMethod() {
211         try {
212             LOG.info("tearing Down");
213             HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
214             deleteTable(admin, TableName.valueOf(TEST_TABLE));
215             createTestTable();
216             deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
217             createCommitTable();
218         } catch (Exception e) {
219             LOG.error("Error tearing down", e);
220         }
221     }
222 
223     void deleteTable(HBaseAdmin admin, TableName tableName) throws IOException {
224         if (admin.tableExists(tableName)) {
225             if (admin.isTableDisabled(tableName)) {
226                 admin.deleteTable(tableName);
227             } else {
228                 admin.disableTable(tableName);
229                 admin.deleteTable(tableName);
230             }
231         }
232     }
233 
234     static boolean verifyValue(byte[] tableName, byte[] row,
235                                byte[] fam, byte[] col, byte[] value) {
236 
237         try (HTable table = new HTable(hbaseConf, tableName)) {
238             Get g = new Get(row).setMaxVersions(1);
239             Result r = table.get(g);
240             Cell cell = r.getColumnLatestCell(fam, col);
241 
242             if (LOG.isTraceEnabled()) {
243                 LOG.trace("Value for " + Bytes.toString(tableName) + ":"
244                                   + Bytes.toString(row) + ":" + Bytes.toString(fam)
245                                   + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell))
246                                   + " (" + Bytes.toString(value) + " expected)");
247             }
248 
249             return Bytes.equals(CellUtil.cloneValue(cell), value);
250         } catch (IOException e) {
251             LOG.error("Error reading row " + Bytes.toString(tableName) + ":"
252                               + Bytes.toString(row) + ":" + Bytes.toString(fam)
253                               + Bytes.toString(col), e);
254             return false;
255         }
256     }
257 }