View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  
21  import static com.google.common.base.Charsets.UTF_8;
22  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
23  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.DEFAULT_COMMIT_TABLE_CF_NAME;
24  import static org.mockito.Mockito.spy;
25  import static org.testng.Assert.assertFalse;
26  import static org.testng.Assert.assertTrue;
27  import com.google.inject.Guice;
28  import com.google.inject.Injector;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.util.Bytes;
35  
36  import org.apache.omid.committable.hbase.KeyGenerator;
37  import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
38  
39  import org.apache.omid.tso.client.OmidClientConfiguration;
40  import org.apache.omid.tso.client.TSOClient;
41  
42  import org.testng.ITestContext;
43  import org.testng.annotations.BeforeClass;
44  import org.testng.annotations.Test;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  import java.io.File;
49  import java.io.IOException;
50  
51  
52  import org.apache.hadoop.conf.Configuration;
53  
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HBaseTestingUtility;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.MiniHBaseCluster;
59  
60  import org.apache.hadoop.hbase.client.Connection;
61  import org.apache.hadoop.hbase.client.ConnectionFactory;
62  
63  import org.apache.hadoop.hbase.client.HBaseAdmin;
64  
65  import org.apache.omid.TestUtils;
66  
67  
68  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
69  import org.apache.omid.tools.hbase.OmidTableManager;
70  import org.apache.omid.tso.TSOMockModule;
71  import org.apache.omid.tso.TSOServer;
72  import org.apache.omid.tso.TSOServerConfig;
73  
74  
75  public class TestOmidLLRaces {
76  
77      static HBaseTestingUtility hBaseUtils;
78      private static MiniHBaseCluster hbaseCluster;
79      static Configuration hbaseConf;
80      static Connection connection;
81  
82      private static final String TEST_FAMILY = "data";
83      static final String TEST_FAMILY2 = "data2";
84      private static final String TEST_TABLE = "test";
85      private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
86      private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
87      private static final byte[] family = Bytes.toBytes("data");
88      private static final byte[] qualifier = Bytes.toBytes("testdata");
89      private static final byte[] data1 = Bytes.toBytes("testWrite-1");
90  
91      private static final Logger LOG = LoggerFactory.getLogger(TestOmidLLRaces.class);
92      private TSOClient client;
93  
94      @BeforeClass
95      public void setup() throws Exception {
96          // TSO Setup
97          TSOServerConfig tsoConfig = new TSOServerConfig();
98          tsoConfig.setPort(1234);
99          tsoConfig.setConflictMapSize(1000);
100         tsoConfig.setLowLatency(true);
101         tsoConfig.setWaitStrategy("LOW_CPU");
102         Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
103         LOG.info("Starting TSO");
104         TSOServer tso = injector.getInstance(TSOServer.class);
105         HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
106         tso.startAsync();
107         tso.awaitRunning();
108         TestUtils.waitForSocketListening("localhost", 1234, 100);
109         LOG.info("Finished loading TSO");
110 
111         OmidClientConfiguration clientConf = new OmidClientConfiguration();
112         clientConf.setConnectionString("localhost:1234");
113 
114         // Create the associated Handler
115         client = TSOClient.newInstance(clientConf);
116 
117         // ------------------------------------------------------------------------------------------------------------
118         // HBase setup
119         // ------------------------------------------------------------------------------------------------------------
120         LOG.info("Creating HBase minicluster");
121         hbaseConf = HBaseConfiguration.create();
122         hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
123         hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
124         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
125 
126         File tempFile = File.createTempFile("OmidTest", "");
127         tempFile.deleteOnExit();
128         hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
129         hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
130         hBaseUtils = new HBaseTestingUtility(hbaseConf);
131         hbaseCluster = hBaseUtils.startMiniCluster(1);
132         connection = ConnectionFactory.createConnection(hbaseConf);
133         hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
134                 new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
135                 Integer.MAX_VALUE);
136         createTestTable();
137         createCommitTable();
138 
139         LOG.info("HBase minicluster is up");
140     }
141 
142 
143     private void createCommitTable() throws IOException {
144         String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
145         OmidTableManager omidTableManager = new OmidTableManager(args);
146         omidTableManager.executeActionsOnHBase(hbaseConf);
147     }
148 
149     private void createTestTable() throws IOException {
150         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
151         HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
152         HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
153         HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
154         datafam.setMaxVersions(Integer.MAX_VALUE);
155         datafam2.setMaxVersions(Integer.MAX_VALUE);
156         test_table_desc.addFamily(datafam);
157         test_table_desc.addFamily(datafam2);
158         admin.createTable(test_table_desc);
159     }
160 
161     protected TransactionManager newTransactionManagerHBaseCommitTable(TSOClient tsoClient) throws Exception {
162         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
163         clientConf.setConnectionString("localhost:1234");
164         clientConf.setHBaseConfiguration(hbaseConf);
165         return HBaseTransactionManager.builder(clientConf)
166                 .tsoClient(tsoClient).build();
167     }
168 
169 
170     @Test(timeOut = 30_000)
171     public void testIsCommitted() throws Exception {
172         AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
173 
174         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
175         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
176                 tm.getCommitTableClient());
177         TTable table = spy(new TTable(htable, snapshotFilter, false));
178 
179         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
180 
181         Put put = new Put(row1);
182         put.addColumn(family, qualifier, data1);
183         table.put(t1, put);
184         tm.commit(t1);
185 
186         HBaseTransaction t2 = (HBaseTransaction) tm.begin();
187         put = new Put(row2);
188         put.addColumn(family, qualifier, data1);
189         table.put(t2, put);
190         table.flushCommits();
191 
192         HBaseTransaction t3 = (HBaseTransaction) tm.begin();
193         put = new Put(row2);
194         put.addColumn(family, qualifier, data1);
195         table.put(t3, put);
196         tm.commit(t3);
197 
198         HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
199         HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
200         HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
201 
202         assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
203         assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
204         assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
205         assertTrue(tm.isLowLatency());
206     }
207 
208 
209     @Test(timeOut = 30_000)
210     public void testInvalidation(ITestContext context) throws Exception {
211         AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
212 
213         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
214         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
215                 tm.getCommitTableClient());
216         TTable table = spy(new TTable(htable, snapshotFilter, false));
217 
218         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
219         Put put = new Put(row1);
220         put.addColumn(family, qualifier, data1);
221         table.put(t1, put);
222 
223         HBaseTransaction t2 = (HBaseTransaction) tm.begin();
224         Get get = new Get(row1);
225         get.addColumn(family, qualifier);
226         table.get(t2,get);
227 
228         //assert there is an invalidation marker:
229         Table commitTable = connection.getTable(TableName.valueOf("OMID_COMMIT_TABLE"));
230         KeyGenerator keygen = KeyGeneratorImplementations.defaultKeyGenerator();
231         byte[] row = keygen.startTimestampToKey(t1.getStartTimestamp());
232         Get getInvalidation = new Get(row);
233         getInvalidation.addColumn(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME),"IT".getBytes(UTF_8));
234         Result res = commitTable.get(getInvalidation);
235         int val = Bytes.toInt(res.getValue(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME), "IT".getBytes(UTF_8)));
236         assertTrue(val == 1);
237 
238         boolean gotInvalidated = false;
239         try {
240             tm.commit(t1);
241         } catch (RollbackException e) {
242             gotInvalidated = true;
243         }
244         assertTrue(gotInvalidated);
245         tm.commit(t2);
246         Thread.sleep(1000);
247         res = commitTable.get(getInvalidation);
248         assertTrue(res.isEmpty());
249         assertTrue(tm.isLowLatency());
250     }
251 }