View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.timestamp.storage;
19  
20  import org.apache.hadoop.conf.Configuration;
21  import org.apache.hadoop.hbase.client.Get;
22  import org.apache.hadoop.hbase.client.HTable;
23  import org.apache.hadoop.hbase.client.Put;
24  import org.apache.hadoop.hbase.client.Result;
25  import org.apache.hadoop.hbase.util.Bytes;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import javax.inject.Inject;
30  import java.io.IOException;
31  
32  import static com.google.common.base.Charsets.UTF_8;
33  
34  /**
35   * Stores the max timestamp assigned by the TO in HBase.
36   * It's always written non-transactionally in the same row and column
37   */
38  public class HBaseTimestampStorage implements TimestampStorage {
39  
40      private static final long INITIAL_MAX_TS_VALUE = 0;
41  
42      private static final Logger LOG = LoggerFactory.getLogger(HBaseTimestampStorage.class);
43  
44      // ROW and COLUMN to write the assigned timestamp
45      private static final byte[] TSO_ROW = "MAX_TIMESTAMP_R".getBytes(UTF_8);
46      private static final byte[] TSO_QUALIFIER = "MAX_TIMESTAMP_Q".getBytes(UTF_8);
47  
48      private final HTable table;
49      private final byte[] cfName;
50  
51      @Inject
52      public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
53          this.table = new HTable(hbaseConfig, config.getTableName());
54          this.cfName = config.getFamilyName().getBytes(UTF_8);
55      }
56  
57      @Override
58      public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
59          if (newMaxTimestamp < 0) {
60              LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
61              throw new IllegalArgumentException("Negative value received for maxTimestamp" + newMaxTimestamp);
62          }
63          Put put = new Put(TSO_ROW);
64          put.add(cfName, TSO_QUALIFIER, Bytes.toBytes(newMaxTimestamp));
65          byte[] previousVal = null;
66          if (previousMaxTimestamp != INITIAL_MAX_TS_VALUE) {
67              previousVal = Bytes.toBytes(previousMaxTimestamp);
68          }
69          if (!table.checkAndPut(TSO_ROW, cfName, TSO_QUALIFIER, previousVal, put)) {
70              throw new IOException("Previous max timestamp is incorrect");
71          }
72      }
73  
74      @Override
75      public long getMaxTimestamp() throws IOException {
76          Get get = new Get(TSO_ROW);
77          get.addColumn(cfName, TSO_QUALIFIER);
78  
79          Result result = table.get(get);
80          if (result.containsColumn(cfName, TSO_QUALIFIER)) {
81              return Bytes.toLong(result.getValue(cfName, TSO_QUALIFIER));
82          } else {
83              // This happens for example when a new cluster is created
84              return INITIAL_MAX_TS_VALUE;
85          }
86  
87      }
88  
89  }