View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.mockito.Matchers.anySetOf;
21  import static org.mockito.Matchers.eq;
22  import static org.mockito.Mockito.doReturn;
23  import static org.mockito.Mockito.mock;
24  import static org.testng.Assert.assertEquals;
25  
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.client.Put;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.omid.tso.client.AbortException;
33  import org.apache.omid.tso.client.ForwardingTSOFuture;
34  import org.apache.omid.tso.client.TSOClient;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  import org.testng.ITestContext;
38  import org.testng.annotations.Test;
39  
40  import com.google.common.util.concurrent.SettableFuture;
41  
42  @Test(groups = "sharedHBase")
43  public class TestTransactionCleanup extends OmidTestBase {
44  
45      private static final Logger LOG = LoggerFactory.getLogger(TestTransactionCleanup.class);
46  
47      private static final long START_TS = 1L;
48  
49      private byte[] row = Bytes.toBytes("row");
50      private byte[] family = Bytes.toBytes(TEST_FAMILY);
51      private byte[] qual = Bytes.toBytes("qual");
52      private byte[] data = Bytes.toBytes("data");
53  
54      // NOTE: This test is maybe redundant with runTestCleanupAfterConflict()
55      // and testCleanupWithDeleteRow() tests in TestTransactionCleanup class.
56      // Code in TestTransactionCleanup is a little more difficult to follow,
57      // lacks some assertions and includes some magic numbers, so we should
58      // try to review and improve the tests in these two classes in a further
59      // commit.
60      @Test(timeOut = 10_000)
61      public void testTransactionIsCleanedUpAfterBeingAborted(ITestContext context) throws Exception {
62  
63          final int ROWS_MODIFIED = 1;
64  
65          // Prepare the mocking results
66          SettableFuture<Long> startTSF = SettableFuture.create();
67          startTSF.set(START_TS);
68          ForwardingTSOFuture<Long> stFF = new ForwardingTSOFuture<>(startTSF);
69  
70          SettableFuture<Long> abortingF = SettableFuture.create();
71          abortingF.setException(new AbortException());
72          ForwardingTSOFuture<Long> abortingFF = new ForwardingTSOFuture<>(abortingF);
73  
74          // Mock the TSO Client setting the right method responses
75          TSOClient mockedTSOClient = mock(TSOClient.class);
76  
77          doReturn(stFF)
78                  .when(mockedTSOClient).getNewStartTimestamp();
79  
80          doReturn(abortingFF)
81                  .when(mockedTSOClient).commit(eq(START_TS), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class));
82  
83          try (TransactionManager tm = newTransactionManager(context, mockedTSOClient);
84               TTable txTable = new TTable(connection, TEST_TABLE)) {
85  
86              // Start a transaction and put some data in a column
87              Transaction tx = tm.begin();
88  
89              Put put = new Put(row);
90              put.addColumn(family, qual, data);
91              txTable.put(tx, put);
92  
93              // Abort transaction when committing, so the cleanup
94              // process we want to test is triggered
95              try {
96                  tm.commit(tx);
97              } catch (RollbackException e) {
98                  // Expected
99              }
100 
101             // So now we have to check that the Delete marker introduced by the
102             // cleanup process is there
103             Scan scan = new Scan(row);
104             scan.setRaw(true); // Raw scan to obtain the deleted cells
105             ResultScanner resultScanner = txTable.getHTable().getScanner(scan);
106             int resultCount = 0;
107             for (Result result : resultScanner) {
108                 assertEquals(result.size(), 2); // Size == 2, including the put and delete from cleanup
109                 LOG.trace("Result {}", result);
110                 // The last element of the qualifier should have the Delete marker
111                 byte encodedType = result.getColumnLatestCell(family, qual).getTypeByte();
112                 assertEquals(KeyValue.Type.codeToType(encodedType), KeyValue.Type.Delete);
113                 resultCount++;
114             }
115             assertEquals(resultCount, ROWS_MODIFIED);
116         }
117     }
118 
119 }