View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.mockito.Mockito.spy;
21  import static org.testng.Assert.assertEquals;
22  import static org.testng.Assert.assertNull;
23  import static org.testng.Assert.fail;
24  
25  import java.io.IOException;
26  
27  import javax.annotation.Nullable;
28  
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.client.Get;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.client.Table;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.omid.TestUtils;
38  import org.apache.omid.committable.CommitTable;
39  import org.apache.omid.committable.InMemoryCommitTable;
40  import org.apache.omid.transaction.Transaction.Status;
41  import org.apache.omid.tso.ProgrammableTSOServer;
42  import org.apache.omid.tso.client.TSOClient;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  import org.testng.annotations.BeforeClass;
46  import org.testng.annotations.BeforeMethod;
47  import org.testng.annotations.Test;
48  
49  @Test(groups = "sharedHBase")
50  public class TestTxMgrFailover extends OmidTestBase {
51  
52      private static final Logger LOG = LoggerFactory.getLogger(TestTxMgrFailover.class);
53  
54      private static final int TSO_SERVER_PORT = 3333;
55      private static final String TSO_SERVER_HOST = "localhost";
56  
57      private static final long TX1_ST = 1L;
58  
59      private static final byte[] qualifier = Bytes.toBytes("test-qual");
60      private static final byte[] row1 = Bytes.toBytes("row1");
61      private static final byte[] data1 = Bytes.toBytes("testWrite-1");
62  
63      // Used in test assertions
64      private InMemoryCommitTable commitTable;
65  
66      private CommitTable.Client commitTableClient;
67  
68      // Allows to prepare the required responses to client request operations
69      private ProgrammableTSOServer tso;
70  
71      // The transaction manager under test
72      private HBaseTransactionManager tm;
73  
74      @BeforeClass(alwaysRun = true)
75      public void beforeClass() throws Exception {
76          // ------------------------------------------------------------------------------------------------------------
77          // ProgrammableTSOServer  setup
78          // ------------------------------------------------------------------------------------------------------------
79          tso = new ProgrammableTSOServer(TSO_SERVER_PORT);
80          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
81      }
82  
83      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
84      public void beforeMethod() throws IOException, InterruptedException {
85  
86          commitTable = new InMemoryCommitTable(); // Use an in-memory commit table to speed up tests
87          commitTableClient = spy(commitTable.getClient());
88  
89          HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
90          hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
91          hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
92          TSOClient tsoClientForTM = spy(TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
93  
94          tm = spy(HBaseTransactionManager.builder(hbaseOmidClientConf)
95                  .tsoClient(tsoClientForTM)
96                  .commitTableClient(commitTableClient)
97                  .build());
98      }
99  
100     @Test(timeOut = 30_000)
101     public void testAbortResponseFromTSOThrowsRollbackExceptionInClient() throws Exception {
102         // Program the TSO to return an ad-hoc Timestamp and an abort response for tx 1
103         tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
104         tso.queueResponse(new ProgrammableTSOServer.AbortResponse(TX1_ST));
105 
106         try (TTable txTable = new TTable(connection, TEST_TABLE)) {
107             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
108             assertEquals(tx1.getStartTimestamp(), TX1_ST);
109             Put put = new Put(row1);
110             put.addColumn(TEST_FAMILY.getBytes(), qualifier, data1);
111             txTable.put(tx1, put);
112             assertEquals(hBaseUtils.countRows(txTable.getHTable()), 1, "Rows should be 1!");
113             checkOperationSuccessOnCell(txTable.getHTable(), KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
114                     qualifier);
115 
116             try {
117                 tm.commit(tx1);
118                 fail();
119             } catch (RollbackException e) {
120                 // Expected!
121 
122             }
123 
124             // Check transaction status
125             assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
126             assertEquals(tx1.getCommitTimestamp(), 0);
127             // Check the cleanup process did its job and the committed data is NOT there
128             checkOperationSuccessOnCell(txTable.getHTable(), KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
129                     qualifier);
130         }
131 
132     }
133 
134     // ----------------------------------------------------------------------------------------------------------------
135     // Helper methods
136     // ----------------------------------------------------------------------------------------------------------------
137 
138     protected void checkOperationSuccessOnCell(Table table,
139                                                KeyValue.Type targetOp,
140                                                @Nullable byte[] expectedValue,
141                                                byte[] tableName,
142                                                byte[] row,
143                                                byte[] fam,
144                                                byte[] col) {
145 
146         try {
147             Get get = new Get(row).setMaxVersions(1);
148             Result result = table.get(get);
149             Cell latestCell = result.getColumnLatestCell(fam, col);
150 
151             switch (targetOp) {
152                 case Put:
153                     assertEquals(latestCell.getTypeByte(), targetOp.getCode());
154                     assertEquals(CellUtil.cloneValue(latestCell), expectedValue);
155                     LOG.trace("Value for " + Bytes.toString(tableName) + ":"
156                             + Bytes.toString(row) + ":" + Bytes.toString(fam) + ":"
157                             + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(latestCell))
158                             + " (" + Bytes.toString(expectedValue) + " expected)");
159                     break;
160                 case Delete:
161                     LOG.trace("Value for " + Bytes.toString(tableName) + ":"
162                             + Bytes.toString(row) + ":" + Bytes.toString(fam)
163                             + Bytes.toString(col) + " deleted");
164                     assertNull(latestCell);
165                     break;
166                 default:
167                     fail();
168             }
169         } catch (IOException e) {
170             LOG.error("Error reading row " + Bytes.toString(tableName) + ":"
171                     + Bytes.toString(row) + ":" + Bytes.toString(fam)
172                     + Bytes.toString(col), e);
173             fail();
174         }
175     }
176 
177 }