View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Charsets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import org.apache.omid.TestUtils;
24  import org.apache.omid.tso.LeaseManagement;
25  import org.apache.omid.tso.PausableLeaseManager;
26  import org.apache.omid.tso.TSOServer;
27  import org.apache.omid.tso.TSOServerConfig;
28  import org.apache.curator.RetryPolicy;
29  import org.apache.curator.framework.CuratorFramework;
30  import org.apache.curator.framework.CuratorFrameworkFactory;
31  import org.apache.curator.framework.recipes.cache.NodeCache;
32  import org.apache.curator.framework.recipes.cache.NodeCacheListener;
33  import org.apache.curator.retry.ExponentialBackoffRetry;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Get;
36  import org.apache.hadoop.hbase.client.HBaseAdmin;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  import org.testng.annotations.AfterMethod;
43  import org.testng.annotations.BeforeMethod;
44  import org.testng.annotations.Test;
45  
46  import java.io.IOException;
47  import java.util.concurrent.CountDownLatch;
48  import java.util.concurrent.TimeUnit;
49  
50  import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME;
51  import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_CF_NAME;
52  import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.HA;
53  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
54  import static org.testng.Assert.assertEquals;
55  import static org.testng.Assert.assertTrue;
56  import static org.testng.Assert.fail;
57  
58  @Test(groups = "sharedHBase")
59  public class TestEndToEndScenariosWithHA extends OmidTestBase {
60  
61      private static final int TEST_LEASE_PERIOD_MS = 5_000;
62      private static final String CURRENT_TSO_PATH = "/CURRENT_TSO_PATH";
63      private static final String TSO_LEASE_PATH = "/TSO_LEASE_PATH";
64      private static final String NAMESPACE = "omid";
65  
66      private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndScenariosWithHA.class);
67  
68      private static final byte[] qualifier1 = Bytes.toBytes("test-q1");
69      private static final byte[] qualifier2 = Bytes.toBytes("test-q2l");
70      private static final byte[] row1 = Bytes.toBytes("row1");
71      private static final byte[] row2 = Bytes.toBytes("row2");
72      private static final byte[] initialData = Bytes.toBytes("testWrite-0");
73      private static final byte[] data1_q1 = Bytes.toBytes("testWrite-1-q1");
74      private static final byte[] data1_q2 = Bytes.toBytes("testWrite-1-q2");
75      private static final byte[] data2_q1 = Bytes.toBytes("testWrite-2-q1");
76      private static final byte[] data2_q2 = Bytes.toBytes("testWrite-2-q2");
77      private static final int TSO1_PORT = 2223;
78      private static final int TSO2_PORT = 4321;
79  
80      private CountDownLatch barrierTillTSOAddressPublication;
81  
82      private CuratorFramework zkClient;
83  
84      private TSOServer tso1;
85      private TSOServer tso2;
86  
87      private PausableLeaseManager leaseManager1;
88  
89      private TransactionManager tm;
90  
91      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
92      public void setup() throws Exception {
93          // Get the zkConnection string from minicluster
94          String zkConnection = "localhost:" + hBaseUtils.getZkCluster().getClientPort();
95  
96          zkClient = provideInitializedZookeeperClient(zkConnection);
97  
98          // Synchronize TSO start
99          barrierTillTSOAddressPublication = new CountDownLatch(1);
100         final NodeCache currentTSOZNode = new NodeCache(zkClient, CURRENT_TSO_PATH);
101         currentTSOZNode.getListenable().addListener(new NodeCacheListener() {
102 
103             @Override
104             public void nodeChanged() throws Exception {
105                 byte[] currentTSOAndEpochAsBytes = currentTSOZNode.getCurrentData().getData();
106                 String currentTSOAndEpoch = new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
107                 if (currentTSOAndEpoch.endsWith("#0")) { // Wait till a TSO instance publishes the epoch
108                     barrierTillTSOAddressPublication.countDown();
109                 }
110             }
111 
112         });
113         currentTSOZNode.start(true);
114 
115         // Configure TSO 1
116         TSOServerConfig config1 = new TSOServerConfig();
117         config1.setPort(TSO1_PORT);
118         config1.setMaxItems(1000);
119         config1.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
120         Injector injector1 = Guice.createInjector(new TestTSOModule(hbaseConf, config1));
121         LOG.info("===================== Starting TSO 1 =====================");
122         tso1 = injector1.getInstance(TSOServer.class);
123         leaseManager1 = (PausableLeaseManager) injector1.getInstance(LeaseManagement.class);
124         tso1.startAndWait();
125         TestUtils.waitForSocketListening("localhost", TSO1_PORT, 100);
126         LOG.info("================ Finished loading TSO 1 ==================");
127 
128         // Configure TSO 2
129         TSOServerConfig config2 = new TSOServerConfig();
130         config2.setPort(TSO2_PORT);
131         config2.setMaxItems(1000);
132         config2.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
133         Injector injector2 = Guice.createInjector(new TestTSOModule(hbaseConf, config2));
134         LOG.info("===================== Starting TSO 2 =====================");
135         tso2 = injector2.getInstance(TSOServer.class);
136         injector2.getInstance(LeaseManagement.class);
137         tso2.startAndWait();
138         // Don't do this here: TestUtils.waitForSocketListening("localhost", 4321, 100);
139         LOG.info("================ Finished loading TSO 2 ==================");
140 
141         // Wait till the master TSO is up
142         barrierTillTSOAddressPublication.await();
143         currentTSOZNode.close();
144 
145         // Configure HBase TM
146         LOG.info("===================== Starting TM =====================");
147         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
148         hbaseOmidClientConf.setConnectionType(HA);
149         hbaseOmidClientConf.setConnectionString(zkConnection);
150         hbaseOmidClientConf.getOmidClientConfiguration().setZkCurrentTsoPath(CURRENT_TSO_PATH);
151         hbaseOmidClientConf.getOmidClientConfiguration().setZkNamespace(NAMESPACE);
152         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
153         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
154         tm = HBaseTransactionManager.builder(hbaseOmidClientConf).build();
155         LOG.info("===================== TM Started =========================");
156     }
157 
158 
159     @AfterMethod(alwaysRun = true, timeOut = 60_000)
160     public void cleanup() throws Exception {
161         LOG.info("Cleanup");
162         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
163         deleteTable(admin, TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
164         hBaseUtils.createTable(Bytes.toBytes(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME),
165                                new byte[][]{DEFAULT_TIMESTAMP_STORAGE_CF_NAME.getBytes()},
166                                Integer.MAX_VALUE);
167         tso1.stopAndWait();
168         TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
169         tso2.stopAndWait();
170         TestUtils.waitForSocketNotListening("localhost", TSO2_PORT, 100);
171 
172         zkClient.delete().forPath(TSO_LEASE_PATH);
173         LOG.info("ZKPath {} deleted", TSO_LEASE_PATH);
174         zkClient.delete().forPath(CURRENT_TSO_PATH);
175         LOG.info("ZKPaths {} deleted", CURRENT_TSO_PATH);
176 
177         zkClient.close();
178     }
179 
180     //
181     // TSO 1 is MASTER & TSO 2 is BACKUP
182     // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
183     // TX 1 starts (TSO1)
184     // TX 1 modifies cells R1C1 & R2C2 (v1)
185     // Interleaved Read TX -IR TX- starts (TSO1)
186     // TSO 1 PAUSES -> TSO 2 becomes MASTER
187     // IR TX reads R1C1 -> should get v0
188     // TX 1 tries to commit -> should abort because was started in TSO 1
189     // IR TX reads R2C2 -> should get v0
190     // IR TX tries to commit -> should abort because was started in TSO 1
191     // End of Test state: R1C1 & R2C2 (v0)
192     @Test(timeOut = 60_000)
193     public void testScenario1() throws Exception {
194         try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
195 
196             // Write initial values for the test
197             HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
198             long initialEpoch = tx0.getEpoch();
199             LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
200             Put putInitialDataRow1 = new Put(row1);
201             putInitialDataRow1.add(TEST_FAMILY.getBytes(), qualifier1, initialData);
202             txTable.put(tx0, putInitialDataRow1);
203             Put putInitialDataRow2 = new Put(row2);
204             putInitialDataRow2.add(TEST_FAMILY.getBytes(), qualifier2, initialData);
205             txTable.put(tx0, putInitialDataRow2);
206             tm.commit(tx0);
207 
208             // Initial checks
209             checkRowValues(txTable, initialData, initialData);
210 
211             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
212             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
213                      Bytes.toString(data1_q2));
214             Put putData1R1Q1 = new Put(row1);
215             putData1R1Q1.add(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
216             txTable.put(tx1, putData1R1Q1);
217             Put putData1R2Q2 = new Put(row2);
218             putData1R2Q2.add(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
219             txTable.put(tx1, putData1R2Q2);
220 
221             Transaction interleavedReadTx = tm.begin();
222 
223             LOG.info("Starting Interleaving Read Tx {} for checking cell values", interleavedReadTx.getTransactionId());
224 
225             // Simulate a GC pause to change mastership (should throw a ServiceUnavailable exception)
226             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
227             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
228             LOG.info("++++++++++++++++++++ PAUSING TSO 1 +++++++++++++++++++");
229             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
230             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
231             leaseManager1.pausedInStillInLeasePeriod();
232 
233             // Read interleaved and check the values writen by tx 1
234             Get getRow1 = new Get(row1).setMaxVersions(1);
235             getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
236             Result r = txTable.get(interleavedReadTx, getRow1);
237             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
238                          "Unexpected value for SI read R1Q1" + interleavedReadTx + ": "
239                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
240 
241             // Try to commit, but it should abort due to the change in mastership
242             try {
243                 tm.commit(tx1);
244                 fail();
245             } catch (RollbackException e) {
246                 // Expected
247                 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
248                 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
249                 assertEquals(tx1.getEpoch(), initialEpoch);
250             }
251 
252             // Read interleaved and check the values written by tx 1
253             Get getRow2 = new Get(row2).setMaxVersions(1);
254             r = txTable.get(interleavedReadTx, getRow2);
255             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
256                          "Unexpected value for SI read R2Q2" + interleavedReadTx + ": "
257                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
258 
259             // Should commit because its a read only tx does not have to contact the TSO
260             tm.commit(interleavedReadTx);
261             assertEquals(interleavedReadTx.getEpoch(), initialEpoch);
262             assertEquals(interleavedReadTx.getStatus(), Transaction.Status.COMMITTED_RO);
263 
264             LOG.info("Wait till the client is informed about the connection parameters of the new TSO");
265             TestUtils.waitForSocketListening("localhost", TSO2_PORT, 100);
266 
267             checkRowValues(txTable, initialData, initialData);
268 
269             // Need to resume to let other test progress
270             leaseManager1.resume();
271 
272         }
273 
274     }
275 
276     //
277     // TSO 1 is MASTER & TSO 2 is BACKUP
278     // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
279     // TX 1 starts (TSO1)
280     // TX 1 modifies cells R1C1 & R2C2 (v1)
281     // TSO 1 is KILLED -> TSO 2 becomes MASTER
282     // TX 1 tries to commit -> should abort because was started in TSO 1
283     // TX 2 starts (TSO1)
284     // TX 2 reads R1C1 -> should get v0
285     // TX 2 reads R2C2 -> should get v0
286     // TX 2 modifies cells R1C1 & R2C2 (v2)
287     // TX 2 commits
288     // End of Test state: R1C1 & R2C2 (v2)
289     @Test(timeOut = 60_000)
290     public void testScenario2() throws Exception {
291         try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
292 
293             // Write initial values for the test
294             HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
295             long initialEpoch = tx0.getEpoch();
296             LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
297             Put putInitialDataRow1 = new Put(row1);
298             putInitialDataRow1.add(TEST_FAMILY.getBytes(), qualifier1, initialData);
299             txTable.put(tx0, putInitialDataRow1);
300             Put putInitialDataRow2 = new Put(row2);
301             putInitialDataRow2.add(TEST_FAMILY.getBytes(), qualifier2, initialData);
302             txTable.put(tx0, putInitialDataRow2);
303             tm.commit(tx0);
304 
305             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
306             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
307                      Bytes.toString(data1_q2));
308             Put putData1R1Q1 = new Put(row1);
309             putData1R1Q1.add(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
310             txTable.put(tx1, putData1R1Q1);
311             Put putData1R2Q2 = new Put(row2);
312             putData1R2Q2.add(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
313             txTable.put(tx1, putData1R2Q2);
314 
315             // Provoke change in mastership (should throw a Connection exception)
316             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
317             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
318             LOG.info("++++++++++++++++++++ KILLING TSO 1 +++++++++++++++++++");
319             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
320             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
321             tso1.stopAndWait();
322             TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
323 
324             // Try to commit, but it should abort due to the change in mastership
325             try {
326                 tm.commit(tx1);
327                 String failMsg = String.format("%s should not commit. Initial epoch was: %d", tx1, initialEpoch);
328                 fail(failMsg);
329             } catch (RollbackException e) {
330                 // Expected
331                 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
332                 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
333                 assertEquals(tx1.getEpoch(), initialEpoch);
334             }
335 
336             LOG.info("Sleep some time till the client is informed about"
337                              + "the new TSO connection parameters and how can connect");
338             TimeUnit.SECONDS.sleep(10 + 2);
339 
340             HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
341             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx2, Bytes.toString(data1_q1),
342                      Bytes.toString(data1_q2));
343             Get getData1R1Q1 = new Get(row1).setMaxVersions(1);
344             Result r = txTable.get(tx2, getData1R1Q1);
345             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
346                          "Unexpected value for SI read R1Q1" + tx2 + ": "
347                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
348             Get getData1R2Q2 = new Get(row2).setMaxVersions(1);
349             r = txTable.get(tx2, getData1R2Q2);
350             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
351                          "Unexpected value for SI read R1Q1" + tx2 + ": "
352                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
353 
354             Put putData2R1Q1 = new Put(row1);
355             putData2R1Q1.add(TEST_FAMILY.getBytes(), qualifier1, data2_q1);
356             txTable.put(tx2, putData2R1Q1);
357             Put putData2R2Q2 = new Put(row2);
358             putData2R2Q2.add(TEST_FAMILY.getBytes(), qualifier2, data2_q2);
359             txTable.put(tx2, putData2R2Q2);
360             // This one should commit in the new TSO
361             tm.commit(tx2);
362 
363             assertEquals(tx2.getStatus(), Transaction.Status.COMMITTED);
364             assertTrue(tx2.getEpoch() > tx0.getCommitTimestamp());
365 
366             checkRowValues(txTable, data2_q1, data2_q2);
367         }
368 
369     }
370 
371     private void checkRowValues(TTable txTable, byte[] expectedDataR1Q1, byte[] expectedDataR2Q2)
372             throws IOException, RollbackException {
373         Transaction readTx = tm.begin();
374         LOG.info("Starting Read Tx {} for checking cell values", readTx.getTransactionId());
375         Get getRow1 = new Get(row1).setMaxVersions(1);
376         getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
377         Result r = txTable.get(readTx, getRow1);
378         assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), expectedDataR1Q1,
379                      "Unexpected value for SI read R1Q1" + readTx + ": " + Bytes
380                              .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
381         Get getRow2 = new Get(row2).setMaxVersions(1);
382         r = txTable.get(readTx, getRow2);
383         assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), expectedDataR2Q2,
384                      "Unexpected value for SI read R2Q2" + readTx + ": " + Bytes
385                              .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
386         tm.commit(readTx);
387     }
388 
389     // ----------------------------------------------------------------------------------------------------------------
390     // Helpers
391     // ----------------------------------------------------------------------------------------------------------------
392 
393     private static CuratorFramework provideInitializedZookeeperClient(String zkConnection) throws Exception {
394 
395         LOG.info("Creating Zookeeper Client connecting to {}", zkConnection);
396 
397         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
398         CuratorFramework zkClient = CuratorFrameworkFactory
399                 .builder()
400                 .namespace(NAMESPACE)
401                 .connectString(zkConnection)
402                 .retryPolicy(retryPolicy).build();
403 
404         LOG.info("Connecting to ZK cluster {}", zkClient.getState());
405         zkClient.start();
406         zkClient.blockUntilConnected();
407         LOG.info("Connection to ZK cluster {}", zkClient.getState());
408 
409         return zkClient;
410     }
411 
412 }