View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
21  import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_CF_NAME;
22  import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME;
23  import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.HA;
24  import static org.testng.Assert.assertEquals;
25  import static org.testng.Assert.assertTrue;
26  import static org.testng.Assert.fail;
27  
28  import java.io.IOException;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.curator.RetryPolicy;
33  import org.apache.curator.framework.CuratorFramework;
34  import org.apache.curator.framework.CuratorFrameworkFactory;
35  import org.apache.curator.framework.recipes.cache.NodeCache;
36  import org.apache.curator.framework.recipes.cache.NodeCacheListener;
37  import org.apache.curator.retry.ExponentialBackoffRetry;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.HBaseAdmin;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.omid.TestUtils;
45  import org.apache.omid.tso.LeaseManagement;
46  import org.apache.omid.tso.PausableLeaseManager;
47  import org.apache.omid.tso.TSOServer;
48  import org.apache.omid.tso.TSOServerConfig;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  import org.testng.annotations.AfterMethod;
52  import org.testng.annotations.BeforeMethod;
53  import org.testng.annotations.Test;
54  
55  import com.google.common.base.Charsets;
56  import com.google.inject.Guice;
57  import com.google.inject.Injector;
58  
59  @Test(groups = "sharedHBase")
60  public class TestEndToEndScenariosWithHA extends OmidTestBase {
61  
62      private static final int TEST_LEASE_PERIOD_MS = 5_000;
63      private static final String CURRENT_TSO_PATH = "/CURRENT_TSO_PATH";
64      private static final String TSO_LEASE_PATH = "/TSO_LEASE_PATH";
65      private static final String NAMESPACE = "omid";
66  
67      private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndScenariosWithHA.class);
68  
69      private static final byte[] qualifier1 = Bytes.toBytes("test-q1");
70      private static final byte[] qualifier2 = Bytes.toBytes("test-q2l");
71      private static final byte[] row1 = Bytes.toBytes("row1");
72      private static final byte[] row2 = Bytes.toBytes("row2");
73      private static final byte[] initialData = Bytes.toBytes("testWrite-0");
74      private static final byte[] data1_q1 = Bytes.toBytes("testWrite-1-q1");
75      private static final byte[] data1_q2 = Bytes.toBytes("testWrite-1-q2");
76      private static final byte[] data2_q1 = Bytes.toBytes("testWrite-2-q1");
77      private static final byte[] data2_q2 = Bytes.toBytes("testWrite-2-q2");
78      private static final int TSO1_PORT = 2223;
79      private static final int TSO2_PORT = 4321;
80  
81      private CountDownLatch barrierTillTSOAddressPublication;
82  
83      private CuratorFramework zkClient;
84  
85      private TSOServer tso1;
86      private TSOServer tso2;
87  
88      private PausableLeaseManager leaseManager1;
89  
90      private TransactionManager tm;
91  
92      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
93      public void setup() throws Exception {
94          // Get the zkConnection string from minicluster
95          String zkConnection = "localhost:" + hBaseUtils.getZkCluster().getClientPort();
96  
97          zkClient = provideInitializedZookeeperClient(zkConnection);
98  
99          // Synchronize TSO start
100         barrierTillTSOAddressPublication = new CountDownLatch(1);
101         final NodeCache currentTSOZNode = new NodeCache(zkClient, CURRENT_TSO_PATH);
102         currentTSOZNode.getListenable().addListener(new NodeCacheListener() {
103 
104             @Override
105             public void nodeChanged() throws Exception {
106                 byte[] currentTSOAndEpochAsBytes = currentTSOZNode.getCurrentData().getData();
107                 String currentTSOAndEpoch = new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
108                 if (currentTSOAndEpoch.endsWith("#0")) { // Wait till a TSO instance publishes the epoch
109                     barrierTillTSOAddressPublication.countDown();
110                 }
111             }
112 
113         });
114         currentTSOZNode.start(true);
115 
116         // Configure TSO 1
117         TSOServerConfig config1 = new TSOServerConfig();
118         config1.setPort(TSO1_PORT);
119         config1.setConflictMapSize(1000);
120         config1.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
121         Injector injector1 = Guice.createInjector(new TestTSOModule(hbaseConf, config1));
122         LOG.info("===================== Starting TSO 1 =====================");
123         tso1 = injector1.getInstance(TSOServer.class);
124         leaseManager1 = (PausableLeaseManager) injector1.getInstance(LeaseManagement.class);
125         tso1.startAndWait();
126         TestUtils.waitForSocketListening("localhost", TSO1_PORT, 100);
127         LOG.info("================ Finished loading TSO 1 ==================");
128 
129         // Configure TSO 2
130         TSOServerConfig config2 = new TSOServerConfig();
131         config2.setPort(TSO2_PORT);
132         config2.setConflictMapSize(1000);
133         config2.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
134         Injector injector2 = Guice.createInjector(new TestTSOModule(hbaseConf, config2));
135         LOG.info("===================== Starting TSO 2 =====================");
136         tso2 = injector2.getInstance(TSOServer.class);
137         injector2.getInstance(LeaseManagement.class);
138         tso2.startAndWait();
139         // Don't do this here: TestUtils.waitForSocketListening("localhost", 4321, 100);
140         LOG.info("================ Finished loading TSO 2 ==================");
141 
142         // Wait till the master TSO is up
143         barrierTillTSOAddressPublication.await();
144         currentTSOZNode.close();
145 
146         // Configure HBase TM
147         LOG.info("===================== Starting TM =====================");
148         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
149         hbaseOmidClientConf.setConnectionType(HA);
150         hbaseOmidClientConf.setConnectionString(zkConnection);
151         hbaseOmidClientConf.getOmidClientConfiguration().setZkCurrentTsoPath(CURRENT_TSO_PATH);
152         hbaseOmidClientConf.getOmidClientConfiguration().setZkNamespace(NAMESPACE);
153         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
154         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
155         tm = HBaseTransactionManager.builder(hbaseOmidClientConf).build();
156         LOG.info("===================== TM Started =========================");
157     }
158 
159 
160     @AfterMethod(alwaysRun = true, timeOut = 60_000)
161     public void cleanup() throws Exception {
162         LOG.info("Cleanup");
163         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
164         deleteTable(admin, TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
165         hBaseUtils.createTable(TableName.valueOf((DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME)),
166                                new byte[][]{DEFAULT_TIMESTAMP_STORAGE_CF_NAME.getBytes()},
167                                Integer.MAX_VALUE);
168         tso1.stopAndWait();
169         TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
170         tso2.stopAndWait();
171         TestUtils.waitForSocketNotListening("localhost", TSO2_PORT, 100);
172 
173         zkClient.delete().forPath(TSO_LEASE_PATH);
174         LOG.info("ZKPath {} deleted", TSO_LEASE_PATH);
175         zkClient.delete().forPath(CURRENT_TSO_PATH);
176         LOG.info("ZKPaths {} deleted", CURRENT_TSO_PATH);
177 
178         zkClient.close();
179     }
180 
181     //
182     // TSO 1 is MASTER & TSO 2 is BACKUP
183     // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
184     // TX 1 starts (TSO1)
185     // TX 1 modifies cells R1C1 & R2C2 (v1)
186     // Interleaved Read TX -IR TX- starts (TSO1)
187     // TSO 1 PAUSES -> TSO 2 becomes MASTER
188     // IR TX reads R1C1 -> should get v0
189     // TX 1 tries to commit -> should abort because was started in TSO 1
190     // IR TX reads R2C2 -> should get v0
191     // IR TX tries to commit -> should abort because was started in TSO 1
192     // End of Test state: R1C1 & R2C2 (v0)
193     @Test(timeOut = 60_000)
194     public void testScenario1() throws Exception {
195         try (TTable txTable = new TTable(connection, TEST_TABLE)) {
196 
197             // Write initial values for the test
198             HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
199             long initialEpoch = tx0.getEpoch();
200             LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
201             Put putInitialDataRow1 = new Put(row1);
202             putInitialDataRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1, initialData);
203             txTable.put(tx0, putInitialDataRow1);
204             Put putInitialDataRow2 = new Put(row2);
205             putInitialDataRow2.addColumn(TEST_FAMILY.getBytes(), qualifier2, initialData);
206             txTable.put(tx0, putInitialDataRow2);
207             tm.commit(tx0);
208 
209             // Initial checks
210             checkRowValues(txTable, initialData, initialData);
211 
212             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
213             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
214                      Bytes.toString(data1_q2));
215             Put putData1R1Q1 = new Put(row1);
216             putData1R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
217             txTable.put(tx1, putData1R1Q1);
218             Put putData1R2Q2 = new Put(row2);
219             putData1R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
220             txTable.put(tx1, putData1R2Q2);
221 
222             Transaction interleavedReadTx = tm.begin();
223 
224             LOG.info("Starting Interleaving Read Tx {} for checking cell values", interleavedReadTx.getTransactionId());
225 
226             // Simulate a GC pause to change mastership (should throw a ServiceUnavailable exception)
227             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
228             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
229             LOG.info("++++++++++++++++++++ PAUSING TSO 1 +++++++++++++++++++");
230             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
231             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
232             leaseManager1.pausedInStillInLeasePeriod();
233 
234             // Read interleaved and check the values writen by tx 1
235             Get getRow1 = new Get(row1).setMaxVersions(1);
236             getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
237             Result r = txTable.get(interleavedReadTx, getRow1);
238             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
239                          "Unexpected value for SI read R1Q1" + interleavedReadTx + ": "
240                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
241 
242             // Try to commit, but it should abort due to the change in mastership
243             try {
244                 tm.commit(tx1);
245                 fail();
246             } catch (RollbackException e) {
247                 // Expected
248                 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
249                 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
250                 assertEquals(tx1.getEpoch(), initialEpoch);
251             }
252 
253             // Read interleaved and check the values written by tx 1
254             Get getRow2 = new Get(row2).setMaxVersions(1);
255             r = txTable.get(interleavedReadTx, getRow2);
256             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
257                          "Unexpected value for SI read R2Q2" + interleavedReadTx + ": "
258                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
259 
260             // Should commit because its a read only tx does not have to contact the TSO
261             tm.commit(interleavedReadTx);
262             assertEquals(interleavedReadTx.getEpoch(), initialEpoch);
263             assertEquals(interleavedReadTx.getStatus(), Transaction.Status.COMMITTED_RO);
264 
265             LOG.info("Wait till the client is informed about the connection parameters of the new TSO");
266             TestUtils.waitForSocketListening("localhost", TSO2_PORT, 100);
267 
268             checkRowValues(txTable, initialData, initialData);
269 
270             // Need to resume to let other test progress
271             leaseManager1.resume();
272 
273         }
274 
275     }
276 
277     //
278     // TSO 1 is MASTER & TSO 2 is BACKUP
279     // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
280     // TX 1 starts (TSO1)
281     // TX 1 modifies cells R1C1 & R2C2 (v1)
282     // TSO 1 is KILLED -> TSO 2 becomes MASTER
283     // TX 1 tries to commit -> should abort because was started in TSO 1
284     // TX 2 starts (TSO1)
285     // TX 2 reads R1C1 -> should get v0
286     // TX 2 reads R2C2 -> should get v0
287     // TX 2 modifies cells R1C1 & R2C2 (v2)
288     // TX 2 commits
289     // End of Test state: R1C1 & R2C2 (v2)
290     @Test(timeOut = 60_000)
291     public void testScenario2() throws Exception {
292         try (TTable txTable = new TTable(connection, TEST_TABLE)) {
293 
294             // Write initial values for the test
295             HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
296             long initialEpoch = tx0.getEpoch();
297             LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
298             Put putInitialDataRow1 = new Put(row1);
299             putInitialDataRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1, initialData);
300             txTable.put(tx0, putInitialDataRow1);
301             Put putInitialDataRow2 = new Put(row2);
302             putInitialDataRow2.addColumn(TEST_FAMILY.getBytes(), qualifier2, initialData);
303             txTable.put(tx0, putInitialDataRow2);
304             tm.commit(tx0);
305 
306             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
307             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
308                      Bytes.toString(data1_q2));
309             Put putData1R1Q1 = new Put(row1);
310             putData1R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
311             txTable.put(tx1, putData1R1Q1);
312             Put putData1R2Q2 = new Put(row2);
313             putData1R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
314             txTable.put(tx1, putData1R2Q2);
315 
316             // Provoke change in mastership (should throw a Connection exception)
317             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
318             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
319             LOG.info("++++++++++++++++++++ KILLING TSO 1 +++++++++++++++++++");
320             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
321             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
322             tso1.stopAndWait();
323             TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
324 
325             // Try to commit, but it should abort due to the change in mastership
326             try {
327                 tm.commit(tx1);
328                 String failMsg = String.format("%s should not commit. Initial epoch was: %d", tx1, initialEpoch);
329                 fail(failMsg);
330             } catch (RollbackException e) {
331                 // Expected
332                 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
333                 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
334                 assertEquals(tx1.getEpoch(), initialEpoch);
335             }
336 
337             LOG.info("Sleep some time till the client is informed about"
338                              + "the new TSO connection parameters and how can connect");
339             TimeUnit.SECONDS.sleep(10 + 2);
340 
341             HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
342             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx2, Bytes.toString(data1_q1),
343                      Bytes.toString(data1_q2));
344             Get getData1R1Q1 = new Get(row1).setMaxVersions(1);
345             Result r = txTable.get(tx2, getData1R1Q1);
346             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
347                          "Unexpected value for SI read R1Q1" + tx2 + ": "
348                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
349             Get getData1R2Q2 = new Get(row2).setMaxVersions(1);
350             r = txTable.get(tx2, getData1R2Q2);
351             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
352                          "Unexpected value for SI read R1Q1" + tx2 + ": "
353                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
354 
355             Put putData2R1Q1 = new Put(row1);
356             putData2R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data2_q1);
357             txTable.put(tx2, putData2R1Q1);
358             Put putData2R2Q2 = new Put(row2);
359             putData2R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data2_q2);
360             txTable.put(tx2, putData2R2Q2);
361             // This one should commit in the new TSO
362             tm.commit(tx2);
363 
364             assertEquals(tx2.getStatus(), Transaction.Status.COMMITTED);
365             assertTrue(tx2.getEpoch() > tx0.getCommitTimestamp());
366 
367             checkRowValues(txTable, data2_q1, data2_q2);
368         }
369 
370     }
371 
372     private void checkRowValues(TTable txTable, byte[] expectedDataR1Q1, byte[] expectedDataR2Q2)
373             throws IOException, RollbackException {
374         Transaction readTx = tm.begin();
375         LOG.info("Starting Read Tx {} for checking cell values", readTx.getTransactionId());
376         Get getRow1 = new Get(row1).setMaxVersions(1);
377         getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
378         Result r = txTable.get(readTx, getRow1);
379         assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), expectedDataR1Q1,
380                      "Unexpected value for SI read R1Q1" + readTx + ": " + Bytes
381                              .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
382         Get getRow2 = new Get(row2).setMaxVersions(1);
383         r = txTable.get(readTx, getRow2);
384         assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), expectedDataR2Q2,
385                      "Unexpected value for SI read R2Q2" + readTx + ": " + Bytes
386                              .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
387         tm.commit(readTx);
388     }
389 
390     // ----------------------------------------------------------------------------------------------------------------
391     // Helpers
392     // ----------------------------------------------------------------------------------------------------------------
393 
394     private static CuratorFramework provideInitializedZookeeperClient(String zkConnection) throws Exception {
395 
396         LOG.info("Creating Zookeeper Client connecting to {}", zkConnection);
397 
398         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
399         CuratorFramework zkClient = CuratorFrameworkFactory
400                 .builder()
401                 .namespace(NAMESPACE)
402                 .connectString(zkConnection)
403                 .retryPolicy(retryPolicy).build();
404 
405         LOG.info("Connecting to ZK cluster {}", zkClient.getState());
406         zkClient.start();
407         zkClient.blockUntilConnected();
408         LOG.info("Connection to ZK cluster {}", zkClient.getState());
409 
410         return zkClient;
411     }
412 
413 }