View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.MiniHBaseCluster;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Admin;
36  import org.apache.hadoop.hbase.client.Connection;
37  import org.apache.hadoop.hbase.client.ConnectionFactory;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.HBaseAdmin;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.omid.TestUtils;
44  import org.apache.omid.committable.CommitTable;
45  import org.apache.omid.committable.InMemoryCommitTable;
46  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
47  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
48  import org.apache.omid.tools.hbase.OmidTableManager;
49  import org.apache.omid.tso.TSOMockModule;
50  import org.apache.omid.tso.TSOServer;
51  import org.apache.omid.tso.TSOServerConfig;
52  import org.apache.omid.tso.client.OmidClientConfiguration;
53  import org.apache.omid.tso.client.TSOClient;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  import org.testng.ITestContext;
57  import org.testng.annotations.AfterGroups;
58  import org.testng.annotations.AfterMethod;
59  import org.testng.annotations.BeforeGroups;
60  import org.testng.annotations.BeforeMethod;
61  
62  import com.google.inject.Guice;
63  import com.google.inject.Injector;
64  
65  public abstract class OmidTestBase {
66  
67      private static final Logger LOG = LoggerFactory.getLogger(OmidTestBase.class);
68  
69      static HBaseTestingUtility hBaseUtils;
70      private static MiniHBaseCluster hbaseCluster;
71      static Configuration hbaseConf;
72      static Connection connection;
73  
74      protected static final String TEST_TABLE = "test";
75      protected static final String TEST_FAMILY = "data";
76      static final String TEST_FAMILY2 = "data2";
77  
78      private HBaseCommitTableConfig hBaseCommitTableConfig;
79  
80      @BeforeMethod(alwaysRun = true)
81      public void beforeClass(Method method) throws Exception {
82          Thread.currentThread().setName("UnitTest-" + method.getName());
83      }
84  
85  
86      @BeforeGroups(groups = "sharedHBase")
87      public void beforeGroups(ITestContext context) throws Exception {
88          // TSO Setup
89          TSOServerConfig tsoConfig = new TSOServerConfig();
90          tsoConfig.setPort(1234);
91          tsoConfig.setConflictMapSize(1000);
92          Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
93          LOG.info("Starting TSO");
94          TSOServer tso = injector.getInstance(TSOServer.class);
95          hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
96          HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
97          tso.startAndWait();
98          TestUtils.waitForSocketListening("localhost", 1234, 100);
99          LOG.info("Finished loading TSO");
100         context.setAttribute("tso", tso);
101 
102         OmidClientConfiguration clientConf = new OmidClientConfiguration();
103         clientConf.setConnectionString("localhost:1234");
104         context.setAttribute("clientConf", clientConf);
105 
106         InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
107         context.setAttribute("commitTable", commitTable);
108 
109         // Create the associated Handler
110         TSOClient client = TSOClient.newInstance(clientConf);
111         context.setAttribute("client", client);
112 
113         // ------------------------------------------------------------------------------------------------------------
114         // HBase setup
115         // ------------------------------------------------------------------------------------------------------------
116         LOG.info("Creating HBase minicluster");
117         hbaseConf = HBaseConfiguration.create();
118         hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
119         hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
120         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
121 
122         File tempFile = File.createTempFile("OmidTest", "");
123         tempFile.deleteOnExit();
124         hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
125         hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
126         hBaseUtils = new HBaseTestingUtility(hbaseConf);
127         hbaseCluster = hBaseUtils.startMiniCluster(1);
128         connection = ConnectionFactory.createConnection(hbaseConf);
129         hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
130                 new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
131                 Integer.MAX_VALUE);
132         createTestTable();
133         createCommitTable();
134 
135         LOG.info("HBase minicluster is up");
136     }
137 
138     protected void createTestTable() throws IOException {
139         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
140         HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
141         HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
142         HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
143         datafam.setMaxVersions(Integer.MAX_VALUE);
144         datafam2.setMaxVersions(Integer.MAX_VALUE);
145         test_table_desc.addFamily(datafam);
146         test_table_desc.addFamily(datafam2);
147         admin.createTable(test_table_desc);
148     }
149 
150     private void createCommitTable() throws IOException {
151         String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
152         OmidTableManager omidTableManager = new OmidTableManager(args);
153         omidTableManager.executeActionsOnHBase(hbaseConf);
154     }
155 
156 
157     private TSOServer getTSO(ITestContext context) {
158         return (TSOServer) context.getAttribute("tso");
159     }
160 
161 
162     TSOClient getClient(ITestContext context) {
163         return (TSOClient) context.getAttribute("client");
164     }
165 
166     InMemoryCommitTable getCommitTable(ITestContext context) {
167         return (InMemoryCommitTable) context.getAttribute("commitTable");
168     }
169 
170     protected TransactionManager newTransactionManager(ITestContext context) throws Exception {
171         return newTransactionManager(context, getClient(context));
172     }
173 
174     protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception {
175         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
176         clientConf.setConnectionString("localhost:1234");
177         clientConf.setHBaseConfiguration(hbaseConf);
178         return HBaseTransactionManager.builder(clientConf)
179                 .postCommitter(postCommitActions)
180                 .commitTableClient(getCommitTable(context).getClient())
181                 .commitTableWriter(getCommitTable(context).getWriter())
182                 .tsoClient(getClient(context)).build();
183     }
184 
185     protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception {
186         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
187         clientConf.setConnectionString("localhost:1234");
188         clientConf.setHBaseConfiguration(hbaseConf);
189         return HBaseTransactionManager.builder(clientConf)
190                 .commitTableClient(getCommitTable(context).getClient())
191                 .commitTableWriter(getCommitTable(context).getWriter())
192                 .tsoClient(tsoClient).build();
193     }
194 
195     protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient)
196             throws Exception {
197         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
198         clientConf.setConnectionString("localhost:1234");
199         clientConf.setHBaseConfiguration(hbaseConf);
200         return HBaseTransactionManager.builder(clientConf)
201                 .commitTableClient(commitTableClient)
202                 .commitTableWriter(getCommitTable(context).getWriter())
203                 .tsoClient(getClient(context)).build();
204     }
205 
206     @AfterGroups(groups = "sharedHBase")
207     public void afterGroups(ITestContext context) throws Exception {
208         LOG.info("Tearing down OmidTestBase...");
209         if (hbaseCluster != null) {
210             hBaseUtils.shutdownMiniCluster();
211         }
212 
213         getClient(context).close().get();
214         getTSO(context).stopAndWait();
215         TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
216     }
217 
218     @AfterMethod(groups = "sharedHBase", timeOut = 60_000)
219     public void afterMethod() {
220         try {
221             LOG.info("tearing Down");
222             Admin admin = hBaseUtils.getHBaseAdmin();
223             deleteTable(admin, TableName.valueOf(TEST_TABLE));
224             createTestTable();
225             if (hBaseCommitTableConfig != null) {
226                 deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
227             }
228             createCommitTable();
229         } catch (Exception e) {
230             LOG.error("Error tearing down", e);
231         }
232     }
233 
234     void deleteTable(Admin admin, TableName tableName) throws IOException {
235         if (admin.tableExists(tableName)) {
236             if (admin.isTableDisabled(tableName)) {
237                 admin.deleteTable(tableName);
238             } else {
239                 admin.disableTable(tableName);
240                 admin.deleteTable(tableName);
241             }
242         }
243     }
244 
245     static boolean verifyValue(Table table, byte[] row,
246                                byte[] fam, byte[] col, byte[] value) {
247 
248         try {
249             Get g = new Get(row).setMaxVersions(1);
250             Result r = table.get(g);
251             Cell cell = r.getColumnLatestCell(fam, col);
252 
253             if (LOG.isTraceEnabled()) {
254                 LOG.trace("Value for " + table.getName().getNameAsString() + ":"
255                                   + Bytes.toString(row) + ":" + Bytes.toString(fam)
256                                   + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell))
257                                   + " (" + Bytes.toString(value) + " expected)");
258             }
259 
260             return Bytes.equals(CellUtil.cloneValue(cell), value);
261         } catch (IOException e) {
262             LOG.error("Error reading row " + table.getName().getNameAsString() + ":"
263                               + Bytes.toString(row) + ":" + Bytes.toString(fam)
264                               + Bytes.toString(col), e);
265             return false;
266         }
267     }
268 }