View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.hadoop.hbase.Cell;
21  import org.apache.hadoop.hbase.CellUtil;
22  import org.apache.hadoop.hbase.KeyValue;
23  import org.apache.hadoop.hbase.client.Get;
24  import org.apache.hadoop.hbase.client.HTable;
25  import org.apache.hadoop.hbase.client.Put;
26  import org.apache.hadoop.hbase.client.Result;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.omid.TestUtils;
29  import org.apache.omid.committable.CommitTable;
30  import org.apache.omid.committable.InMemoryCommitTable;
31  import org.apache.omid.transaction.Transaction.Status;
32  import org.apache.omid.tso.ProgrammableTSOServer;
33  import org.apache.omid.tso.client.TSOClient;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.testng.annotations.BeforeClass;
37  import org.testng.annotations.BeforeMethod;
38  import org.testng.annotations.Test;
39  
40  import javax.annotation.Nullable;
41  import java.io.IOException;
42  
43  import static org.mockito.Mockito.spy;
44  import static org.testng.Assert.assertEquals;
45  import static org.testng.Assert.assertNull;
46  import static org.testng.Assert.fail;
47  
48  @Test(groups = "sharedHBase")
49  public class TestTxMgrFailover extends OmidTestBase {
50  
51      private static final Logger LOG = LoggerFactory.getLogger(TestTxMgrFailover.class);
52  
53      private static final int TSO_SERVER_PORT = 3333;
54      private static final String TSO_SERVER_HOST = "localhost";
55  
56      private static final long TX1_ST = 1L;
57      private static final long TX1_CT = 2L;
58  
59      private static final byte[] qualifier = Bytes.toBytes("test-qual");
60      private static final byte[] row1 = Bytes.toBytes("row1");
61      private static final byte[] data1 = Bytes.toBytes("testWrite-1");
62  
63      // Used in test assertions
64      private InMemoryCommitTable commitTable;
65  
66      private CommitTable.Client commitTableClient;
67  
68      // Allows to prepare the required responses to client request operations
69      private ProgrammableTSOServer tso;
70  
71      // The transaction manager under test
72      private HBaseTransactionManager tm;
73  
74      @BeforeClass(alwaysRun = true)
75      public void beforeClass() throws Exception {
76          // ------------------------------------------------------------------------------------------------------------
77          // ProgrammableTSOServer  setup
78          // ------------------------------------------------------------------------------------------------------------
79          tso = new ProgrammableTSOServer(TSO_SERVER_PORT);
80          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
81      }
82  
83      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
84      public void beforeMethod() throws IOException, InterruptedException {
85  
86          commitTable = new InMemoryCommitTable(); // Use an in-memory commit table to speed up tests
87          commitTableClient = spy(commitTable.getClient());
88  
89          HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
90          hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
91          hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
92          TSOClient tsoClientForTM = spy(TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
93  
94          tm = spy(HBaseTransactionManager.builder(hbaseOmidClientConf)
95                  .tsoClient(tsoClientForTM)
96                  .commitTableClient(commitTableClient)
97                  .build());
98      }
99  
100     @Test(timeOut = 30_000)
101     public void testAbortResponseFromTSOThrowsRollbackExceptionInClient() throws Exception {
102         // Program the TSO to return an ad-hoc Timestamp and an abort response for tx 1
103         tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
104         tso.queueResponse(new ProgrammableTSOServer.AbortResponse(TX1_ST));
105 
106         try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
107             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
108             assertEquals(tx1.getStartTimestamp(), TX1_ST);
109             Put put = new Put(row1);
110             put.add(TEST_FAMILY.getBytes(), qualifier, data1);
111             txTable.put(tx1, put);
112             assertEquals(hBaseUtils.countRows(new HTable(hbaseConf, TEST_TABLE)), 1, "Rows should be 1!");
113             checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
114                     qualifier);
115 
116             try {
117                 tm.commit(tx1);
118                 fail();
119             } catch (RollbackException e) {
120                 // Expected!
121 
122             }
123 
124             // Check transaction status
125             assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
126             assertEquals(tx1.getCommitTimestamp(), 0);
127             // Check the cleanup process did its job and the committed data is NOT there
128             checkOperationSuccessOnCell(KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
129                     qualifier);
130         }
131 
132     }
133 
134     // ----------------------------------------------------------------------------------------------------------------
135     // Helper methods
136     // ----------------------------------------------------------------------------------------------------------------
137 
138     protected void checkOperationSuccessOnCell(KeyValue.Type targetOp,
139                                                @Nullable byte[] expectedValue,
140                                                byte[] tableName,
141                                                byte[] row,
142                                                byte[] fam,
143                                                byte[] col) {
144 
145         try (HTable table = new HTable(hbaseConf, tableName)) {
146             Get get = new Get(row).setMaxVersions(1);
147             Result result = table.get(get);
148             Cell latestCell = result.getColumnLatestCell(fam, col);
149 
150             switch (targetOp) {
151                 case Put:
152                     assertEquals(latestCell.getTypeByte(), targetOp.getCode());
153                     assertEquals(CellUtil.cloneValue(latestCell), expectedValue);
154                     LOG.trace("Value for " + Bytes.toString(tableName) + ":"
155                             + Bytes.toString(row) + ":" + Bytes.toString(fam) + ":"
156                             + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(latestCell))
157                             + " (" + Bytes.toString(expectedValue) + " expected)");
158                     break;
159                 case Delete:
160                     LOG.trace("Value for " + Bytes.toString(tableName) + ":"
161                             + Bytes.toString(row) + ":" + Bytes.toString(fam)
162                             + Bytes.toString(col) + " deleted");
163                     assertNull(latestCell);
164                     break;
165                 default:
166                     fail();
167             }
168         } catch (IOException e) {
169             LOG.error("Error reading row " + Bytes.toString(tableName) + ":"
170                     + Bytes.toString(row) + ":" + Bytes.toString(fam)
171                     + Bytes.toString(col), e);
172             fail();
173         }
174     }
175 
176 }