1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.omid.metrics.MetricsUtils.name;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.ExecutionException;
29
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Connection;
32 import org.apache.hadoop.hbase.client.Mutation;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Table;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.omid.committable.CommitTable;
37 import org.apache.omid.metrics.MetricsRegistry;
38 import org.apache.omid.metrics.Timer;
39 import org.apache.omid.tso.client.CellId;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
44 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
45
46 public class HBaseSyncPostCommitter implements PostCommitActions {
47
48 private static final Logger LOG = LoggerFactory.getLogger(HBaseSyncPostCommitter.class);
49
50 private final MetricsRegistry metrics;
51 private final CommitTable.Client commitTableClient;
52
53 private final Timer commitTableUpdateTimer;
54 private final Timer shadowCellsUpdateTimer;
55 static final int MAX_BATCH_SIZE=1000;
56 private final Connection connection;
57
58 public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
59 Connection connection) {
60 this.metrics = metrics;
61 this.commitTableClient = commitTableClient;
62
63 this.commitTableUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "commitTableUpdate", "latency"));
64 this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
65 this.connection = connection;
66 }
67
68 private void flushMutations(TableName tableName, List<Mutation> mutations) throws IOException, InterruptedException {
69 try (Table table = connection.getTable(tableName)){
70 table.batch(mutations, new Object[mutations.size()]);
71 }
72
73 }
74
75 private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
76 Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
77 Put put = new Put(cell.getRow());
78 put.addColumn(cell.getFamily(),
79 CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
80 cell.getTimestamp(),
81 Bytes.toBytes(tx.getCommitTimestamp()));
82
83 TableName table = cell.getTable().getHTable().getName();
84 List<Mutation> tableMutations = mutations.get(table);
85 if (tableMutations == null) {
86 ArrayList<Mutation> newList = new ArrayList<>();
87 newList.add(put);
88 mutations.put(table, newList);
89 } else {
90 tableMutations.add(put);
91 if (tableMutations.size() > MAX_BATCH_SIZE) {
92 flushMutations(table, tableMutations);
93 mutations.remove(table);
94 }
95 }
96 }
97
98 @Override
99 public ListenableFuture<Void> updateShadowCells(AbstractTransaction<? extends CellId> transaction) {
100
101 SettableFuture<Void> updateSCFuture = SettableFuture.create();
102
103 HBaseTransaction tx = HBaseTransactionManager.enforceHBaseTransactionAsParam(transaction);
104
105 shadowCellsUpdateTimer.start();
106 try {
107 Map<TableName,List<Mutation>> mutations = new HashMap<>();
108
109 for (HBaseCellId cell : tx.getWriteSet()) {
110 addShadowCell(cell, tx, updateSCFuture, mutations);
111 }
112
113 for (HBaseCellId cell : tx.getConflictFreeWriteSet()) {
114 addShadowCell(cell, tx, updateSCFuture, mutations);
115 }
116
117 for (Map.Entry<TableName,List<Mutation>> entry: mutations.entrySet()) {
118 flushMutations(entry.getKey(), entry.getValue());
119 }
120
121
122 updateSCFuture.set(null);
123 } catch (IOException | InterruptedException e) {
124 LOG.warn("{}: Error inserting shadow cells", tx, e);
125 updateSCFuture.setException(
126 new TransactionManagerException(tx + ": Error inserting shadow cells ", e));
127 } finally {
128 shadowCellsUpdateTimer.stop();
129 }
130
131 return updateSCFuture;
132
133 }
134
135 @Override
136 public ListenableFuture<Void> removeCommitTableEntry(AbstractTransaction<? extends CellId> transaction) {
137
138 SettableFuture<Void> updateSCFuture = SettableFuture.create();
139
140 HBaseTransaction tx = HBaseTransactionManager.enforceHBaseTransactionAsParam(transaction);
141
142 commitTableUpdateTimer.start();
143
144 try {
145 commitTableClient.deleteCommitEntry(tx.getStartTimestamp()).get();
146 updateSCFuture.set(null);
147 } catch (InterruptedException e) {
148 Thread.currentThread().interrupt();
149 LOG.warn("{}: interrupted during commit table entry delete", tx, e);
150 updateSCFuture.setException(
151 new TransactionManagerException(tx + ": interrupted during commit table entry delete"));
152 } catch (ExecutionException e) {
153 LOG.warn("{}: can't remove commit table entry", tx, e);
154 updateSCFuture.setException(new TransactionManagerException(tx + ": can't remove commit table entry"));
155 } finally {
156 commitTableUpdateTimer.stop();
157 }
158
159 return updateSCFuture;
160
161 }
162
163 }