View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable.hbase;
19  
20  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_QUALIFIER;
21  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
22  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
23  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
24  import java.io.IOException;
25  import java.util.LinkedList;
26  import java.util.List;
27  
28  
29  import javax.inject.Inject;
30  
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Delete;
36  import org.apache.hadoop.hbase.client.Get;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Table;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.omid.committable.CommitTable;
42  import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
47  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture;
48  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
49  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
50  import com.google.protobuf.CodedInputStream;
51  import com.google.protobuf.CodedOutputStream;
52  
53  public class HBaseCommitTable implements CommitTable {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(HBaseCommitTable.class);
56  
57      private final Connection hbaseConnection;
58      private final String tableName;
59      private final byte[] commitTableFamily;
60      private final byte[] lowWatermarkFamily;
61      private final KeyGenerator keygen;
62  
63      /**
64       * Create a hbase commit table.
65       * Note that we do not take ownership of the passed htable, it is just used to construct the writer and client.
66       * @throws IOException 
67       */
68      @Inject
69      public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) throws IOException {
70          this(ConnectionFactory.createConnection(hbaseConfig), config, KeyGeneratorImplementations.defaultKeyGenerator());
71      }
72  
73      public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config) throws IOException {
74          this(hbaseConnection, config, KeyGeneratorImplementations.defaultKeyGenerator());
75      }
76  
77      public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
78          this(ConnectionFactory.createConnection(hbaseConfig), config, keygen);
79      }
80  
81      public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
82  
83          this.hbaseConnection = hbaseConnection;
84          this.tableName = config.getTableName();
85          this.commitTableFamily = config.getCommitTableFamily();
86          this.lowWatermarkFamily = config.getLowWatermarkFamily();
87          this.keygen = keygen;
88  
89      }
90  
91      // ----------------------------------------------------------------------------------------------------------------
92      // Reader and Writer
93      // ----------------------------------------------------------------------------------------------------------------
94  
95      private class HBaseWriter implements Writer {
96  
97          private static final long INITIAL_LWM_VALUE = -1L;
98  
99          // Our own buffer for operations
100         final List<Put> writeBuffer = new LinkedList<>();
101         volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
102 
103         HBaseWriter() {
104 
105         }
106 
107         @Override
108         public void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
109             assert (startTimestamp < commitTimestamp);
110             Put put = new Put(startTimestampToKey(startTimestamp), startTimestamp);
111             byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
112             put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
113             writeBuffer.add(put);
114         }
115 
116         @Override
117         public void updateLowWatermark(long lowWatermark) throws IOException {
118             lowWatermarkToStore = lowWatermark;
119         }
120 
121         @Override
122         public void flush() throws IOException {
123 
124             try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
125                 addLowWatermarkToStoreToWriteBuffer();
126                 table.put(writeBuffer);
127                 writeBuffer.clear();
128             } catch (IOException e) {
129                 LOG.error("Error flushing data", e);
130                 throw e;
131             }
132         }
133 
134         @Override
135         public void clearWriteBuffer() {
136             writeBuffer.clear();
137         }
138 
139         @Override
140         public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
141             try (Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
142                 assert (startTimestamp < commitTimestamp);
143                 byte[] transactionRow = startTimestampToKey(startTimestamp);
144                 Put put = new Put(transactionRow, startTimestamp);
145                 byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
146                 put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
147                 return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
148             }
149 
150         }
151 
152         private void addLowWatermarkToStoreToWriteBuffer() {
153             long lowWatermark = lowWatermarkToStore;
154             if(lowWatermark != INITIAL_LWM_VALUE) {
155                 Put put = new Put(LOW_WATERMARK_ROW);
156                 put.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER, Bytes.toBytes(lowWatermark));
157                 writeBuffer.add(put);
158             }
159         }
160 
161     }
162 
163     class HBaseClient implements Client{
164 
165         HBaseClient(){
166 
167         }
168 
169         @Override
170         public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
171             startTimestamp = removeCheckpointBits(startTimestamp);
172             SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
173             try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
174                 Get get = new Get(startTimestampToKey(startTimestamp));
175                 get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
176                 get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
177 
178                 Result result = table.get(get);
179 
180                 if (containsInvalidTransaction(result)) {
181                     CommitTimestamp invalidCT =
182                             new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false);
183                     f.set(Optional.of(invalidCT));
184                     return f;
185                 }
186 
187                 if (containsATimestamp(result)) {
188                     long commitTSValue =
189                             decodeCommitTimestamp(startTimestamp, result.getValue(commitTableFamily, COMMIT_TABLE_QUALIFIER));
190                     CommitTimestamp validCT = new CommitTimestamp(Location.COMMIT_TABLE, commitTSValue, true);
191                     f.set(Optional.of(validCT));
192                 } else {
193                     f.set(Optional.<CommitTimestamp>absent());
194                 }
195             } catch (IOException e) {
196                 LOG.error("Error getting commit timestamp for TX {}", startTimestamp, e);
197                 f.setException(e);
198             }
199             return f;
200         }
201 
202         @Override
203         public ListenableFuture<Long> readLowWatermark() {
204             SettableFuture<Long> f = SettableFuture.create();
205             try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
206                 Get get = new Get(LOW_WATERMARK_ROW);
207                 get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
208                 Result result = table.get(get);
209                 if (containsLowWatermark(result)) {
210                     long lowWatermark = Bytes.toLong(result.getValue(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
211                     f.set(lowWatermark);
212                 } else {
213                     f.set(0L);
214                 }
215             } catch (IOException e) {
216                 LOG.error("Error getting low watermark", e);
217                 f.setException(e);
218             }
219             return f;
220         }
221 
222         // This function is only used to delete a CT entry and should be renamed
223         @Override
224         public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
225             startTimestamp = removeCheckpointBits(startTimestamp);
226             byte[] key;
227             try {
228                 key = startTimestampToKey(startTimestamp);
229             } catch (IOException e) {
230                 LOG.warn("Error generating timestamp for transaction completion", e);
231                 SettableFuture<Void> f = SettableFuture.create();
232                 f.setException(e);
233                 return f;
234             }
235 
236             Delete delete = new Delete(key, startTimestamp);
237 
238             try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
239                 table.delete(delete);
240             } catch (IOException e) {
241                 SettableFuture<Void> f = SettableFuture.create();
242                 LOG.warn("Error contacting hbase", e);
243                 f.setException(e);
244             }
245             SettableFuture<Void> f = SettableFuture.create();
246             f.set(null);
247             return f;
248         }
249 
250         @Override
251         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
252             startTimestamp = removeCheckpointBits(startTimestamp);
253             SettableFuture<Boolean> f = SettableFuture.create();
254             try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
255                 byte[] row = startTimestampToKey(startTimestamp);
256                 Put invalidationPut = new Put(row, startTimestamp);
257                 invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
258 
259                 // We need to write to the invalid column only if the commit timestamp
260                 // is empty. This has to be done atomically. Otherwise, if we first
261                 // check the commit timestamp and right before the invalidation a commit
262                 // timestamp is added and read by a transaction, then snapshot isolation
263                 // might not be hold (due to the invalidation)
264                 // TODO: Decide what we should we do if we can not contact the commit table. loop till succeed???
265                 boolean result = table.checkAndPut(row, commitTableFamily, COMMIT_TABLE_QUALIFIER, null, invalidationPut);
266                 f.set(result);
267             } catch (IOException ioe) {
268                 f.setException(ioe);
269             }
270             return f;
271         }
272 
273         private boolean containsATimestamp(Result result) {
274             return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
275         }
276 
277         private boolean containsInvalidTransaction(Result result) {
278             return (result != null && result.containsColumn(commitTableFamily, INVALID_TX_QUALIFIER));
279         }
280 
281         private boolean containsLowWatermark(Result result) {
282             return (result != null && result.containsColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
283         }
284 
285         private class DeleteRequest extends AbstractFuture<Void> {
286             final Delete delete;
287 
288             DeleteRequest(Delete delete) {
289                 this.delete = delete;
290             }
291 
292             void error(IOException ioe) {
293                 setException(ioe);
294             }
295 
296             void complete() {
297                 set(null);
298             }
299 
300             Delete getDelete() {
301                 return delete;
302             }
303         }
304     }
305 
306     // ----------------------------------------------------------------------------------------------------------------
307     // Getters
308     // ----------------------------------------------------------------------------------------------------------------
309 
310     @Override
311     public Writer getWriter() throws IOException {
312         return new HBaseWriter();
313     }
314 
315     @Override
316     public Client getClient() throws IOException {
317         return new HBaseClient();
318     }
319 
320     // ----------------------------------------------------------------------------------------------------------------
321     // Helper methods
322     // ----------------------------------------------------------------------------------------------------------------
323     static long removeCheckpointBits(long startTimestamp) {
324         return startTimestamp - (startTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN);
325     }
326 
327     private byte[] startTimestampToKey(long startTimestamp) throws IOException {
328         return keygen.startTimestampToKey(startTimestamp);
329     }
330 
331     private static byte[] encodeCommitTimestamp(long startTimestamp, long commitTimestamp) throws IOException {
332         assert (startTimestamp < commitTimestamp);
333         long diff = commitTimestamp - startTimestamp;
334         byte[] bytes = new byte[CodedOutputStream.computeInt64SizeNoTag(diff)];
335         CodedOutputStream cos = CodedOutputStream.newInstance(bytes);
336         cos.writeInt64NoTag(diff);
337         cos.flush();
338         return bytes;
339 
340     }
341 
342     private static long decodeCommitTimestamp(long startTimestamp, byte[] encodedCommitTimestamp) throws IOException {
343         CodedInputStream cis = CodedInputStream.newInstance(encodedCommitTimestamp);
344         long diff = cis.readInt64();
345         return startTimestamp + diff;
346     }
347 
348 }