1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Matchers.anySetOf;
21 import static org.mockito.Matchers.eq;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.testng.Assert.assertEquals;
25
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.hbase.client.Put;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.ResultScanner;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.omid.tso.client.AbortException;
33 import org.apache.omid.tso.client.ForwardingTSOFuture;
34 import org.apache.omid.tso.client.TSOClient;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.testng.ITestContext;
38 import org.testng.annotations.Test;
39
40 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
41
42 @Test(groups = "sharedHBase")
43 public class TestTransactionCleanup extends OmidTestBase {
44
45 private static final Logger LOG = LoggerFactory.getLogger(TestTransactionCleanup.class);
46
47 private static final long START_TS = 1L;
48
49 private byte[] row = Bytes.toBytes("row");
50 private byte[] family = Bytes.toBytes(TEST_FAMILY);
51 private byte[] qual = Bytes.toBytes("qual");
52 private byte[] data = Bytes.toBytes("data");
53
54
55
56
57
58
59
60 @Test(timeOut = 10_000)
61 public void testTransactionIsCleanedUpAfterBeingAborted(ITestContext context) throws Exception {
62
63 final int ROWS_MODIFIED = 1;
64
65
66 SettableFuture<Long> startTSF = SettableFuture.create();
67 startTSF.set(START_TS);
68 ForwardingTSOFuture<Long> stFF = new ForwardingTSOFuture<>(startTSF);
69
70 SettableFuture<Long> abortingF = SettableFuture.create();
71 abortingF.setException(new AbortException());
72 ForwardingTSOFuture<Long> abortingFF = new ForwardingTSOFuture<>(abortingF);
73
74
75 TSOClient mockedTSOClient = mock(TSOClient.class);
76
77 doReturn(stFF)
78 .when(mockedTSOClient).getNewStartTimestamp();
79
80 doReturn(abortingFF)
81 .when(mockedTSOClient).commit(eq(START_TS), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class));
82
83 try (TransactionManager tm = newTransactionManager(context, mockedTSOClient);
84 TTable txTable = new TTable(connection, TEST_TABLE)) {
85
86
87 Transaction tx = tm.begin();
88
89 Put put = new Put(row);
90 put.addColumn(family, qual, data);
91 txTable.put(tx, put);
92
93
94
95 try {
96 tm.commit(tx);
97 } catch (RollbackException e) {
98
99 }
100
101
102
103 Scan scan = new Scan(row);
104 scan.setRaw(true);
105 ResultScanner resultScanner = txTable.getHTable().getScanner(scan);
106 int resultCount = 0;
107 for (Result result : resultScanner) {
108 assertEquals(result.size(), 2);
109 LOG.trace("Result {}", result);
110
111 byte encodedType = result.getColumnLatestCell(family, qual).getTypeByte();
112 assertEquals(KeyValue.Type.codeToType(encodedType), KeyValue.Type.Delete);
113 resultCount++;
114 }
115 assertEquals(resultCount, ROWS_MODIFIED);
116 }
117 }
118
119 }