View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import java.io.IOException;
21  import java.util.HashSet;
22  import java.util.Set;
23  
24  import org.apache.hadoop.hbase.client.Delete;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
29      private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);
30  
31      public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
32                              Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, boolean isLowLatency) {
33          super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
34      }
35  
36      public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
37                              Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm,
38                              long readTimestamp, long writeTimestamp, boolean isLowLatency) {
39          super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, readTimestamp, writeTimestamp, isLowLatency);
40      }
41  
42      public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch,
43                              Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet,
44                              AbstractTransactionManager tm, boolean isLowLatency) {
45          super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
46      }
47  
48      private void deleteCell(HBaseCellId cell) {
49          Delete delete = new Delete(cell.getRow());
50          delete.addColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
51          try {
52              cell.getTable().getHTable().delete(delete);
53          } catch (IOException e) {
54              LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
55          }
56      }
57      @Override
58      public void cleanup() {
59          for (final HBaseCellId cell : getWriteSet()) {
60              deleteCell(cell);
61          }
62  
63          for (final HBaseCellId cell : getConflictFreeWriteSet()) {
64              deleteCell(cell);
65          }
66          try {
67              flushTables();
68          } catch (IOException e) {
69              LOG.warn("Failed flushing tables for Tx {}", getTransactionId(), e);
70          }
71      }
72  
73      /**
74       * Flushes pending operations for tables touched by transaction
75       * @throws IOException in case of any I/O related issues
76       */
77      public void flushTables() throws IOException {
78  
79          for (TTable writtenTable : getWrittenTables()) {
80              writtenTable.flushCommits();
81          }
82  
83      }
84  
85      // ****************************************************************************************************************
86      // Helper methods
87      // ****************************************************************************************************************
88  
89      private Set<TTable> getWrittenTables() {
90          HashSet<HBaseCellId> writeSet = (HashSet<HBaseCellId>) getWriteSet();
91          Set<TTable> tables = new HashSet<TTable>();
92          for (HBaseCellId cell : writeSet) {
93              tables.add(cell.getTable());
94          }
95          writeSet = (HashSet<HBaseCellId>) getConflictFreeWriteSet();
96          for (HBaseCellId cell : writeSet) {
97              tables.add(cell.getTable());
98          }
99          return tables;
100     }
101 
102 }