1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Mockito.spy;
21 import static org.testng.Assert.assertEquals;
22 import static org.testng.Assert.assertNull;
23 import static org.testng.Assert.fail;
24
25 import java.io.IOException;
26
27 import javax.annotation.Nullable;
28
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.client.Get;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Table;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.omid.TestUtils;
38 import org.apache.omid.committable.CommitTable;
39 import org.apache.omid.committable.InMemoryCommitTable;
40 import org.apache.omid.transaction.Transaction.Status;
41 import org.apache.omid.tso.ProgrammableTSOServer;
42 import org.apache.omid.tso.client.TSOClient;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.testng.annotations.BeforeClass;
46 import org.testng.annotations.BeforeMethod;
47 import org.testng.annotations.Test;
48
49 @Test(groups = "sharedHBase")
50 public class TestTxMgrFailover extends OmidTestBase {
51
52 private static final Logger LOG = LoggerFactory.getLogger(TestTxMgrFailover.class);
53
54 private static final int TSO_SERVER_PORT = 3333;
55 private static final String TSO_SERVER_HOST = "localhost";
56
57 private static final long TX1_ST = 1L;
58
59 private static final byte[] qualifier = Bytes.toBytes("test-qual");
60 private static final byte[] row1 = Bytes.toBytes("row1");
61 private static final byte[] data1 = Bytes.toBytes("testWrite-1");
62
63
64 private InMemoryCommitTable commitTable;
65
66 private CommitTable.Client commitTableClient;
67
68
69 private ProgrammableTSOServer tso;
70
71
72 private HBaseTransactionManager tm;
73
74 @BeforeClass(alwaysRun = true)
75 public void beforeClass() throws Exception {
76
77
78
79 tso = new ProgrammableTSOServer(TSO_SERVER_PORT);
80 TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
81 }
82
83 @BeforeMethod(alwaysRun = true, timeOut = 30_000)
84 public void beforeMethod() throws IOException, InterruptedException {
85
86 commitTable = new InMemoryCommitTable();
87 commitTableClient = spy(commitTable.getClient());
88
89 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
90 hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
91 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
92 TSOClient tsoClientForTM = spy(TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
93
94 tm = spy(HBaseTransactionManager.builder(hbaseOmidClientConf)
95 .tsoClient(tsoClientForTM)
96 .commitTableClient(commitTableClient)
97 .build());
98 }
99
100 @Test(timeOut = 30_000)
101 public void testAbortResponseFromTSOThrowsRollbackExceptionInClient() throws Exception {
102
103 tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
104 tso.queueResponse(new ProgrammableTSOServer.AbortResponse(TX1_ST));
105
106 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
107 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
108 assertEquals(tx1.getStartTimestamp(), TX1_ST);
109 Put put = new Put(row1);
110 put.addColumn(TEST_FAMILY.getBytes(), qualifier, data1);
111 txTable.put(tx1, put);
112 assertEquals(hBaseUtils.countRows(txTable.getHTable()), 1, "Rows should be 1!");
113 checkOperationSuccessOnCell(txTable.getHTable(), KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
114 qualifier);
115
116 try {
117 tm.commit(tx1);
118 fail();
119 } catch (RollbackException e) {
120
121
122 }
123
124
125 assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
126 assertEquals(tx1.getCommitTimestamp(), 0);
127
128 checkOperationSuccessOnCell(txTable.getHTable(), KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
129 qualifier);
130 }
131
132 }
133
134
135
136
137
138 protected void checkOperationSuccessOnCell(Table table,
139 KeyValue.Type targetOp,
140 @Nullable byte[] expectedValue,
141 byte[] tableName,
142 byte[] row,
143 byte[] fam,
144 byte[] col) {
145
146 try {
147 Get get = new Get(row).setMaxVersions(1);
148 Result result = table.get(get);
149 Cell latestCell = result.getColumnLatestCell(fam, col);
150
151 switch (targetOp) {
152 case Put:
153 assertEquals(latestCell.getTypeByte(), targetOp.getCode());
154 assertEquals(CellUtil.cloneValue(latestCell), expectedValue);
155 LOG.trace("Value for " + Bytes.toString(tableName) + ":"
156 + Bytes.toString(row) + ":" + Bytes.toString(fam) + ":"
157 + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(latestCell))
158 + " (" + Bytes.toString(expectedValue) + " expected)");
159 break;
160 case Delete:
161 LOG.trace("Value for " + Bytes.toString(tableName) + ":"
162 + Bytes.toString(row) + ":" + Bytes.toString(fam)
163 + Bytes.toString(col) + " deleted");
164 assertNull(latestCell);
165 break;
166 default:
167 fail();
168 }
169 } catch (IOException e) {
170 LOG.error("Error reading row " + Bytes.toString(tableName) + ":"
171 + Bytes.toString(row) + ":" + Bytes.toString(fam)
172 + Bytes.toString(col), e);
173 fail();
174 }
175 }
176
177 }