1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.timestamp.storage;
19
20 import static com.google.common.base.Charsets.UTF_8;
21
22 import java.io.IOException;
23
24 import javax.inject.Inject;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.TableName;
28 import org.apache.hadoop.hbase.client.Connection;
29 import org.apache.hadoop.hbase.client.ConnectionFactory;
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38
39
40
41
42 public class HBaseTimestampStorage implements TimestampStorage {
43
44 private static final long INITIAL_MAX_TS_VALUE = 0;
45
46 private static final Logger LOG = LoggerFactory.getLogger(HBaseTimestampStorage.class);
47
48
49 private static final byte[] TSO_ROW = "MAX_TIMESTAMP_R".getBytes(UTF_8);
50 private static final byte[] TSO_QUALIFIER = "MAX_TIMESTAMP_Q".getBytes(UTF_8);
51
52 private final Table table;
53 private final byte[] cfName;
54 private final Connection connection;
55
56 @Inject
57 public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
58 connection = ConnectionFactory.createConnection(hbaseConfig);
59 this.table = connection.getTable(TableName.valueOf(config.getTableName()));
60 this.cfName = config.getFamilyName().getBytes(UTF_8);
61 }
62
63 @Override
64 public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
65 if (newMaxTimestamp < 0) {
66 LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
67 throw new IllegalArgumentException("Negative value received for maxTimestamp" + newMaxTimestamp);
68 }
69 Put put = new Put(TSO_ROW);
70 put.addColumn(cfName, TSO_QUALIFIER, Bytes.toBytes(newMaxTimestamp));
71 byte[] previousVal = null;
72 if (previousMaxTimestamp != INITIAL_MAX_TS_VALUE) {
73 previousVal = Bytes.toBytes(previousMaxTimestamp);
74 }
75 if (!table.checkAndPut(TSO_ROW, cfName, TSO_QUALIFIER, previousVal, put)) {
76 throw new IOException("Previous max timestamp is incorrect");
77 }
78 }
79
80 @Override
81 public long getMaxTimestamp() throws IOException {
82 Get get = new Get(TSO_ROW);
83 get.addColumn(cfName, TSO_QUALIFIER);
84
85 Result result = table.get(get);
86 if (result.containsColumn(cfName, TSO_QUALIFIER)) {
87 return Bytes.toLong(result.getValue(cfName, TSO_QUALIFIER));
88 } else {
89
90 return INITIAL_MAX_TS_VALUE;
91 }
92
93 }
94
95
96 public void close() throws IOException {
97
98 table.close();
99 connection.close();
100 }
101 }