1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.committable.hbase;
19
20 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_QUALIFIER;
21 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
22 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
23 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
24 import java.io.IOException;
25 import java.util.LinkedList;
26 import java.util.List;
27
28
29 import javax.inject.Inject;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.Get;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.Table;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.omid.committable.CommitTable;
42 import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
47 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture;
48 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
49 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
50 import com.google.protobuf.CodedInputStream;
51 import com.google.protobuf.CodedOutputStream;
52
53 public class HBaseCommitTable implements CommitTable {
54
55 private static final Logger LOG = LoggerFactory.getLogger(HBaseCommitTable.class);
56
57 private final Connection hbaseConnection;
58 private final String tableName;
59 private final byte[] commitTableFamily;
60 private final byte[] lowWatermarkFamily;
61 private final KeyGenerator keygen;
62
63
64
65
66
67
68 @Inject
69 public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) throws IOException {
70 this(ConnectionFactory.createConnection(hbaseConfig), config, KeyGeneratorImplementations.defaultKeyGenerator());
71 }
72
73 public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config) throws IOException {
74 this(hbaseConnection, config, KeyGeneratorImplementations.defaultKeyGenerator());
75 }
76
77 public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
78 this(ConnectionFactory.createConnection(hbaseConfig), config, keygen);
79 }
80
81 public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
82
83 this.hbaseConnection = hbaseConnection;
84 this.tableName = config.getTableName();
85 this.commitTableFamily = config.getCommitTableFamily();
86 this.lowWatermarkFamily = config.getLowWatermarkFamily();
87 this.keygen = keygen;
88
89 }
90
91
92
93
94
95 private class HBaseWriter implements Writer {
96
97 private static final long INITIAL_LWM_VALUE = -1L;
98
99
100 final List<Put> writeBuffer = new LinkedList<>();
101 volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
102
103 HBaseWriter() {
104
105 }
106
107 @Override
108 public void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
109 assert (startTimestamp < commitTimestamp);
110 Put put = new Put(startTimestampToKey(startTimestamp), startTimestamp);
111 byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
112 put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
113 writeBuffer.add(put);
114 }
115
116 @Override
117 public void updateLowWatermark(long lowWatermark) throws IOException {
118 lowWatermarkToStore = lowWatermark;
119 }
120
121 @Override
122 public void flush() throws IOException {
123
124 try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
125 addLowWatermarkToStoreToWriteBuffer();
126 table.put(writeBuffer);
127 writeBuffer.clear();
128 } catch (IOException e) {
129 LOG.error("Error flushing data", e);
130 throw e;
131 }
132 }
133
134 @Override
135 public void clearWriteBuffer() {
136 writeBuffer.clear();
137 }
138
139 @Override
140 public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
141 try (Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
142 assert (startTimestamp < commitTimestamp);
143 byte[] transactionRow = startTimestampToKey(startTimestamp);
144 Put put = new Put(transactionRow, startTimestamp);
145 byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
146 put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
147 return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
148 }
149
150 }
151
152 private void addLowWatermarkToStoreToWriteBuffer() {
153 long lowWatermark = lowWatermarkToStore;
154 if(lowWatermark != INITIAL_LWM_VALUE) {
155 Put put = new Put(LOW_WATERMARK_ROW);
156 put.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER, Bytes.toBytes(lowWatermark));
157 writeBuffer.add(put);
158 }
159 }
160
161 }
162
163 class HBaseClient implements Client{
164
165 HBaseClient(){
166
167 }
168
169 @Override
170 public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
171 startTimestamp = removeCheckpointBits(startTimestamp);
172 SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
173 try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
174 Get get = new Get(startTimestampToKey(startTimestamp));
175 get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
176 get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
177
178 Result result = table.get(get);
179
180 if (containsInvalidTransaction(result)) {
181 CommitTimestamp invalidCT =
182 new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false);
183 f.set(Optional.of(invalidCT));
184 return f;
185 }
186
187 if (containsATimestamp(result)) {
188 long commitTSValue =
189 decodeCommitTimestamp(startTimestamp, result.getValue(commitTableFamily, COMMIT_TABLE_QUALIFIER));
190 CommitTimestamp validCT = new CommitTimestamp(Location.COMMIT_TABLE, commitTSValue, true);
191 f.set(Optional.of(validCT));
192 } else {
193 f.set(Optional.<CommitTimestamp>absent());
194 }
195 } catch (IOException e) {
196 LOG.error("Error getting commit timestamp for TX {}", startTimestamp, e);
197 f.setException(e);
198 }
199 return f;
200 }
201
202 @Override
203 public ListenableFuture<Long> readLowWatermark() {
204 SettableFuture<Long> f = SettableFuture.create();
205 try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
206 Get get = new Get(LOW_WATERMARK_ROW);
207 get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
208 Result result = table.get(get);
209 if (containsLowWatermark(result)) {
210 long lowWatermark = Bytes.toLong(result.getValue(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
211 f.set(lowWatermark);
212 } else {
213 f.set(0L);
214 }
215 } catch (IOException e) {
216 LOG.error("Error getting low watermark", e);
217 f.setException(e);
218 }
219 return f;
220 }
221
222
223 @Override
224 public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
225 startTimestamp = removeCheckpointBits(startTimestamp);
226 byte[] key;
227 try {
228 key = startTimestampToKey(startTimestamp);
229 } catch (IOException e) {
230 LOG.warn("Error generating timestamp for transaction completion", e);
231 SettableFuture<Void> f = SettableFuture.create();
232 f.setException(e);
233 return f;
234 }
235
236 Delete delete = new Delete(key, startTimestamp);
237
238 try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
239 table.delete(delete);
240 } catch (IOException e) {
241 SettableFuture<Void> f = SettableFuture.create();
242 LOG.warn("Error contacting hbase", e);
243 f.setException(e);
244 }
245 SettableFuture<Void> f = SettableFuture.create();
246 f.set(null);
247 return f;
248 }
249
250 @Override
251 public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
252 startTimestamp = removeCheckpointBits(startTimestamp);
253 SettableFuture<Boolean> f = SettableFuture.create();
254 try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
255 byte[] row = startTimestampToKey(startTimestamp);
256 Put invalidationPut = new Put(row, startTimestamp);
257 invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
258
259
260
261
262
263
264
265 boolean result = table.checkAndPut(row, commitTableFamily, COMMIT_TABLE_QUALIFIER, null, invalidationPut);
266 f.set(result);
267 } catch (IOException ioe) {
268 f.setException(ioe);
269 }
270 return f;
271 }
272
273 private boolean containsATimestamp(Result result) {
274 return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
275 }
276
277 private boolean containsInvalidTransaction(Result result) {
278 return (result != null && result.containsColumn(commitTableFamily, INVALID_TX_QUALIFIER));
279 }
280
281 private boolean containsLowWatermark(Result result) {
282 return (result != null && result.containsColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
283 }
284
285 private class DeleteRequest extends AbstractFuture<Void> {
286 final Delete delete;
287
288 DeleteRequest(Delete delete) {
289 this.delete = delete;
290 }
291
292 void error(IOException ioe) {
293 setException(ioe);
294 }
295
296 void complete() {
297 set(null);
298 }
299
300 Delete getDelete() {
301 return delete;
302 }
303 }
304 }
305
306
307
308
309
310 @Override
311 public Writer getWriter() throws IOException {
312 return new HBaseWriter();
313 }
314
315 @Override
316 public Client getClient() throws IOException {
317 return new HBaseClient();
318 }
319
320
321
322
323 static long removeCheckpointBits(long startTimestamp) {
324 return startTimestamp - (startTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN);
325 }
326
327 private byte[] startTimestampToKey(long startTimestamp) throws IOException {
328 return keygen.startTimestampToKey(startTimestamp);
329 }
330
331 private static byte[] encodeCommitTimestamp(long startTimestamp, long commitTimestamp) throws IOException {
332 assert (startTimestamp < commitTimestamp);
333 long diff = commitTimestamp - startTimestamp;
334 byte[] bytes = new byte[CodedOutputStream.computeInt64SizeNoTag(diff)];
335 CodedOutputStream cos = CodedOutputStream.newInstance(bytes);
336 cos.writeInt64NoTag(diff);
337 cos.flush();
338 return bytes;
339
340 }
341
342 private static long decodeCommitTimestamp(long startTimestamp, byte[] encodedCommitTimestamp) throws IOException {
343 CodedInputStream cis = CodedInputStream.newInstance(encodedCommitTimestamp);
344 long diff = cis.readInt64();
345 return startTimestamp + diff;
346 }
347
348 }