View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.timestamp.storage;
19  
20  import static com.google.common.base.Charsets.UTF_8;
21  
22  import java.io.IOException;
23  
24  import javax.inject.Inject;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.TableName;
28  import org.apache.hadoop.hbase.client.Connection;
29  import org.apache.hadoop.hbase.client.ConnectionFactory;
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /**
39   * Stores the max timestamp assigned by the TO in HBase.
40   * It's always written non-transactionally in the same row and column
41   */
42  public class HBaseTimestampStorage implements TimestampStorage {
43  
44      private static final long INITIAL_MAX_TS_VALUE = 0;
45  
46      private static final Logger LOG = LoggerFactory.getLogger(HBaseTimestampStorage.class);
47  
48      // ROW and COLUMN to write the assigned timestamp
49      private static final byte[] TSO_ROW = "MAX_TIMESTAMP_R".getBytes(UTF_8);
50      private static final byte[] TSO_QUALIFIER = "MAX_TIMESTAMP_Q".getBytes(UTF_8);
51  
52      private final Table table;
53      private final byte[] cfName;
54      private final Connection connection;
55  
56      @Inject
57      public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
58          connection = ConnectionFactory.createConnection(hbaseConfig);
59          this.table = connection.getTable(TableName.valueOf(config.getTableName()));
60          this.cfName = config.getFamilyName().getBytes(UTF_8);
61      }
62  
63      @Override
64      public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
65          if (newMaxTimestamp < 0) {
66              LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
67              throw new IllegalArgumentException("Negative value received for maxTimestamp" + newMaxTimestamp);
68          }
69          Put put = new Put(TSO_ROW);
70          put.addColumn(cfName, TSO_QUALIFIER, Bytes.toBytes(newMaxTimestamp));
71          byte[] previousVal = null;
72          if (previousMaxTimestamp != INITIAL_MAX_TS_VALUE) {
73              previousVal = Bytes.toBytes(previousMaxTimestamp);
74          }
75          if (!table.checkAndPut(TSO_ROW, cfName, TSO_QUALIFIER, previousVal, put)) {
76              throw new IOException("Previous max timestamp is incorrect");
77          }
78      }
79  
80      @Override
81      public long getMaxTimestamp() throws IOException {
82          Get get = new Get(TSO_ROW);
83          get.addColumn(cfName, TSO_QUALIFIER);
84  
85          Result result = table.get(get);
86          if (result.containsColumn(cfName, TSO_QUALIFIER)) {
87              return Bytes.toLong(result.getValue(cfName, TSO_QUALIFIER));
88          } else {
89              // This happens for example when a new cluster is created
90              return INITIAL_MAX_TS_VALUE;
91          }
92  
93      }
94  
95  
96      public void close() throws IOException {
97          //TODO this is never called
98          table.close();
99          connection.close();
100     }
101 }