View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.timestamp.storage;
19  
20  import static com.google.common.base.Charsets.UTF_8;
21  
22  import java.io.IOException;
23  
24  import javax.inject.Inject;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.TableName;
28  import org.apache.hadoop.hbase.client.Connection;
29  import org.apache.hadoop.hbase.client.ConnectionFactory;
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /**
39   * Stores the max timestamp assigned by the TO in HBase.
40   * It's always written non-transactionally in the same row and column
41   */
42  public class HBaseTimestampStorage implements TimestampStorage {
43  
44      private static final long INITIAL_MAX_TS_VALUE = 0;
45  
46      private static final Logger LOG = LoggerFactory.getLogger(HBaseTimestampStorage.class);
47  
48      // ROW and COLUMN to write the assigned timestamp
49      private static final byte[] TSO_ROW = "MAX_TIMESTAMP_R".getBytes(UTF_8);
50      private static final byte[] TSO_QUALIFIER = "MAX_TIMESTAMP_Q".getBytes(UTF_8);
51  
52      private final Table table;
53      private final byte[] cfName;
54  
55      @Inject
56      public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
57          Connection conn = ConnectionFactory.createConnection(hbaseConfig);
58          this.table = conn.getTable(TableName.valueOf(config.getTableName()));
59          this.cfName = config.getFamilyName().getBytes(UTF_8);
60      }
61  
62      @Override
63      public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
64          if (newMaxTimestamp < 0) {
65              LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
66              throw new IllegalArgumentException("Negative value received for maxTimestamp" + newMaxTimestamp);
67          }
68          Put put = new Put(TSO_ROW);
69          put.addColumn(cfName, TSO_QUALIFIER, Bytes.toBytes(newMaxTimestamp));
70          byte[] previousVal = null;
71          if (previousMaxTimestamp != INITIAL_MAX_TS_VALUE) {
72              previousVal = Bytes.toBytes(previousMaxTimestamp);
73          }
74          if (!table.checkAndPut(TSO_ROW, cfName, TSO_QUALIFIER, previousVal, put)) {
75              throw new IOException("Previous max timestamp is incorrect");
76          }
77      }
78  
79      @Override
80      public long getMaxTimestamp() throws IOException {
81          Get get = new Get(TSO_ROW);
82          get.addColumn(cfName, TSO_QUALIFIER);
83  
84          Result result = table.get(get);
85          if (result.containsColumn(cfName, TSO_QUALIFIER)) {
86              return Bytes.toLong(result.getValue(cfName, TSO_QUALIFIER));
87          } else {
88              // This happens for example when a new cluster is created
89              return INITIAL_MAX_TS_VALUE;
90          }
91  
92      }
93  
94  }