View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable.hbase;
19  
20  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_QUALIFIER;
21  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
22  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
23  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.concurrent.ArrayBlockingQueue;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.TimeUnit;
34  
35  import javax.inject.Inject;
36  
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Connection;
40  import org.apache.hadoop.hbase.client.ConnectionFactory;
41  import org.apache.hadoop.hbase.client.Delete;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.omid.committable.CommitTable;
48  import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import com.google.common.base.Optional;
53  import com.google.common.util.concurrent.AbstractFuture;
54  import com.google.common.util.concurrent.ListenableFuture;
55  import com.google.common.util.concurrent.SettableFuture;
56  import com.google.common.util.concurrent.ThreadFactoryBuilder;
57  import com.google.protobuf.CodedInputStream;
58  import com.google.protobuf.CodedOutputStream;
59  
60  public class HBaseCommitTable implements CommitTable {
61  
62      private static final Logger LOG = LoggerFactory.getLogger(HBaseCommitTable.class);
63  
64      private final Connection hbaseConnection;
65      private final String tableName;
66      private final byte[] commitTableFamily;
67      private final byte[] lowWatermarkFamily;
68      private final KeyGenerator keygen;
69  
70      /**
71       * Create a hbase commit table.
72       * Note that we do not take ownership of the passed htable, it is just used to construct the writer and client.
73       * @throws IOException 
74       */
75      @Inject
76      public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) throws IOException {
77          this(ConnectionFactory.createConnection(hbaseConfig), config, KeyGeneratorImplementations.defaultKeyGenerator());
78      }
79  
80      public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config) throws IOException {
81          this(hbaseConnection, config, KeyGeneratorImplementations.defaultKeyGenerator());
82      }
83  
84      public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
85          this(ConnectionFactory.createConnection(hbaseConfig), config, keygen);
86      }
87  
88      public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
89  
90          this.hbaseConnection = hbaseConnection;
91          this.tableName = config.getTableName();
92          this.commitTableFamily = config.getCommitTableFamily();
93          this.lowWatermarkFamily = config.getLowWatermarkFamily();
94          this.keygen = keygen;
95  
96      }
97  
98      // ----------------------------------------------------------------------------------------------------------------
99      // Reader and Writer
100     // ----------------------------------------------------------------------------------------------------------------
101 
102     private class HBaseWriter implements Writer {
103 
104         private static final long INITIAL_LWM_VALUE = -1L;
105         final Table table;
106         // Our own buffer for operations
107         final List<Put> writeBuffer = new LinkedList<>();
108         volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
109 
110         HBaseWriter() throws IOException {
111             table = hbaseConnection.getTable(TableName.valueOf(tableName));
112         }
113 
114         @Override
115         public void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
116             assert (startTimestamp < commitTimestamp);
117             Put put = new Put(startTimestampToKey(startTimestamp), startTimestamp);
118             byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
119             put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
120             writeBuffer.add(put);
121         }
122 
123         @Override
124         public void updateLowWatermark(long lowWatermark) throws IOException {
125             lowWatermarkToStore = lowWatermark;
126         }
127 
128         @Override
129         public void flush() throws IOException {
130             try {
131                 addLowWatermarkToStoreToWriteBuffer();
132                 table.put(writeBuffer);
133                 writeBuffer.clear();
134             } catch (IOException e) {
135                 LOG.error("Error flushing data", e);
136                 throw e;
137             }
138         }
139 
140         @Override
141         public void clearWriteBuffer() {
142             writeBuffer.clear();
143         }
144 
145         @Override
146         public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
147             assert (startTimestamp < commitTimestamp);
148             byte[] transactionRow = startTimestampToKey(startTimestamp);
149             Put put = new Put(transactionRow, startTimestamp);
150             byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
151             put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
152             return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
153         }
154 
155         @Override
156         public void close() throws IOException {
157             clearWriteBuffer();
158             table.close();
159         }
160 
161         private void addLowWatermarkToStoreToWriteBuffer() {
162             long lowWatermark = lowWatermarkToStore;
163             if(lowWatermark != INITIAL_LWM_VALUE) {
164                 Put put = new Put(LOW_WATERMARK_ROW);
165                 put.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER, Bytes.toBytes(lowWatermark));
166                 writeBuffer.add(put);
167             }
168         }
169 
170     }
171 
172     class HBaseClient implements Client, Runnable {
173 
174         final Table table;
175         final Table deleteTable;
176         final ExecutorService deleteBatchExecutor;
177         final BlockingQueue<DeleteRequest> deleteQueue;
178         boolean isClosed = false; // @GuardedBy("this")
179         final static int DELETE_BATCH_SIZE = 1024;
180 
181         HBaseClient() throws IOException {
182             // TODO: create TTable here instead
183             table = hbaseConnection.getTable(TableName.valueOf(tableName));
184             // FIXME: why is this using autoFlush of false? Why would every Delete
185             // need to be send through a separate RPC?
186             deleteTable = hbaseConnection.getTable(TableName.valueOf(tableName));
187             deleteQueue = new ArrayBlockingQueue<>(DELETE_BATCH_SIZE);
188 
189             deleteBatchExecutor = Executors.newSingleThreadExecutor(
190                     new ThreadFactoryBuilder().setNameFormat("omid-completor-%d").build());
191             deleteBatchExecutor.submit(this);
192 
193         }
194 
195         @Override
196         public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
197 
198             SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
199             try {
200                 Get get = new Get(startTimestampToKey(startTimestamp));
201                 get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
202                 get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
203 
204                 Result result = table.get(get);
205 
206                 if (containsInvalidTransaction(result)) {
207                     CommitTimestamp invalidCT =
208                             new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false);
209                     f.set(Optional.of(invalidCT));
210                     return f;
211                 }
212 
213                 if (containsATimestamp(result)) {
214                     long commitTSValue =
215                             decodeCommitTimestamp(startTimestamp, result.getValue(commitTableFamily, COMMIT_TABLE_QUALIFIER));
216                     CommitTimestamp validCT = new CommitTimestamp(Location.COMMIT_TABLE, commitTSValue, true);
217                     f.set(Optional.of(validCT));
218                 } else {
219                     f.set(Optional.<CommitTimestamp>absent());
220                 }
221             } catch (IOException e) {
222                 LOG.error("Error getting commit timestamp for TX {}", startTimestamp, e);
223                 f.setException(e);
224             }
225             return f;
226         }
227 
228         @Override
229         public ListenableFuture<Long> readLowWatermark() {
230             SettableFuture<Long> f = SettableFuture.create();
231             try {
232                 Get get = new Get(LOW_WATERMARK_ROW);
233                 get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
234                 Result result = table.get(get);
235                 if (containsLowWatermark(result)) {
236                     long lowWatermark = Bytes.toLong(result.getValue(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
237                     f.set(lowWatermark);
238                 } else {
239                     f.set(0L);
240                 }
241             } catch (IOException e) {
242                 LOG.error("Error getting low watermark", e);
243                 f.setException(e);
244             }
245             return f;
246         }
247 
248         @Override
249         public ListenableFuture<Void> completeTransaction(long startTimestamp) {
250             try {
251                 synchronized (this) {
252 
253                     if (isClosed) {
254                         SettableFuture<Void> f = SettableFuture.create();
255                         f.setException(new IOException("Not accepting requests anymore"));
256                         return f;
257                     }
258 
259                     DeleteRequest req = new DeleteRequest(
260                             new Delete(startTimestampToKey(startTimestamp), startTimestamp));
261                     deleteQueue.put(req);
262                     return req;
263                 }
264             } catch (IOException ioe) {
265                 LOG.warn("Error generating timestamp for transaction completion", ioe);
266                 SettableFuture<Void> f = SettableFuture.create();
267                 f.setException(ioe);
268                 return f;
269             } catch (InterruptedException ie) {
270                 Thread.currentThread().interrupt();
271                 SettableFuture<Void> f = SettableFuture.create();
272                 f.setException(ie);
273                 return f;
274             }
275         }
276 
277         @Override
278         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
279             SettableFuture<Boolean> f = SettableFuture.create();
280             try {
281                 byte[] row = startTimestampToKey(startTimestamp);
282                 Put invalidationPut = new Put(row, startTimestamp);
283                 invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
284 
285                 // We need to write to the invalid column only if the commit timestamp
286                 // is empty. This has to be done atomically. Otherwise, if we first
287                 // check the commit timestamp and right before the invalidation a commit
288                 // timestamp is added and read by a transaction, then snapshot isolation
289                 // might not be hold (due to the invalidation)
290                 // TODO: Decide what we should we do if we can not contact the commit table. loop till succeed???
291                 boolean result = table.checkAndPut(row, commitTableFamily, COMMIT_TABLE_QUALIFIER, null, invalidationPut);
292                 f.set(result);
293             } catch (IOException ioe) {
294                 f.setException(ioe);
295             }
296             return f;
297         }
298 
299         @Override
300         @SuppressWarnings("InfiniteLoopStatement")
301         public void run() {
302             List<DeleteRequest> reqbatch = new ArrayList<>();
303             try {
304                 while (true) {
305                     DeleteRequest r = deleteQueue.poll();
306                     if (r == null && reqbatch.size() == 0) {
307                         r = deleteQueue.take();
308                     }
309 
310                     if (r != null) {
311                         reqbatch.add(r);
312                     }
313 
314                     if (r == null || reqbatch.size() == DELETE_BATCH_SIZE) {
315                         List<Delete> deletes = new ArrayList<>();
316                         for (DeleteRequest dr : reqbatch) {
317                             deletes.add(dr.getDelete());
318                         }
319                         try {
320                             deleteTable.delete(deletes);
321                             for (DeleteRequest dr : reqbatch) {
322                                 dr.complete();
323                             }
324                         } catch (IOException ioe) {
325                             LOG.warn("Error contacting hbase", ioe);
326                             for (DeleteRequest dr : reqbatch) {
327                                 dr.error(ioe);
328                             }
329                         } finally {
330                             reqbatch.clear();
331                         }
332                     }
333                 }
334             } catch (InterruptedException ie) {
335                 // Drain the queue and place the exception in the future
336                 // for those who placed requests
337                 LOG.warn("Draining delete queue");
338                 DeleteRequest queuedRequest = deleteQueue.poll();
339                 while (queuedRequest != null) {
340                     reqbatch.add(queuedRequest);
341                     queuedRequest = deleteQueue.poll();
342                 }
343                 for (DeleteRequest dr : reqbatch) {
344                     dr.error(new IOException("HBase CommitTable is going to be closed"));
345                 }
346                 reqbatch.clear();
347                 Thread.currentThread().interrupt();
348             } catch (Throwable t) {
349                 LOG.error("Transaction completion thread threw exception", t);
350             }
351         }
352 
353         @Override
354         public synchronized void close() throws IOException {
355             isClosed = true;
356             deleteBatchExecutor.shutdownNow(); // may need to interrupt take
357             try {
358                 if (!deleteBatchExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
359                     LOG.warn("Delete executor did not shutdown");
360                 }
361             } catch (InterruptedException ie) {
362                 Thread.currentThread().interrupt();
363             }
364 
365             LOG.warn("Re-Draining delete queue just in case");
366             DeleteRequest queuedRequest = deleteQueue.poll();
367             while (queuedRequest != null) {
368                 queuedRequest.error(new IOException("HBase CommitTable is going to be closed"));
369                 queuedRequest = deleteQueue.poll();
370             }
371 
372             deleteTable.close();
373             table.close();
374         }
375 
376         private boolean containsATimestamp(Result result) {
377             return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
378         }
379 
380         private boolean containsInvalidTransaction(Result result) {
381             return (result != null && result.containsColumn(commitTableFamily, INVALID_TX_QUALIFIER));
382         }
383 
384         private boolean containsLowWatermark(Result result) {
385             return (result != null && result.containsColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
386         }
387 
388         private class DeleteRequest extends AbstractFuture<Void> {
389             final Delete delete;
390 
391             DeleteRequest(Delete delete) {
392                 this.delete = delete;
393             }
394 
395             void error(IOException ioe) {
396                 setException(ioe);
397             }
398 
399             void complete() {
400                 set(null);
401             }
402 
403             Delete getDelete() {
404                 return delete;
405             }
406         }
407     }
408 
409     // ----------------------------------------------------------------------------------------------------------------
410     // Getters
411     // ----------------------------------------------------------------------------------------------------------------
412 
413     @Override
414     public Writer getWriter() throws IOException {
415         return new HBaseWriter();
416     }
417 
418     @Override
419     public Client getClient() throws IOException {
420         return new HBaseClient();
421     }
422 
423     // ----------------------------------------------------------------------------------------------------------------
424     // Helper methods
425     // ----------------------------------------------------------------------------------------------------------------
426 
427     private byte[] startTimestampToKey(long startTimestamp) throws IOException {
428         return keygen.startTimestampToKey(startTimestamp);
429     }
430 
431     private static byte[] encodeCommitTimestamp(long startTimestamp, long commitTimestamp) throws IOException {
432         assert (startTimestamp < commitTimestamp);
433         long diff = commitTimestamp - startTimestamp;
434         byte[] bytes = new byte[CodedOutputStream.computeInt64SizeNoTag(diff)];
435         CodedOutputStream cos = CodedOutputStream.newInstance(bytes);
436         cos.writeInt64NoTag(diff);
437         cos.flush();
438         return bytes;
439 
440     }
441 
442     private static long decodeCommitTimestamp(long startTimestamp, byte[] encodedCommitTimestamp) throws IOException {
443         CodedInputStream cis = CodedInputStream.newInstance(encodedCommitTimestamp);
444         long diff = cis.readInt64();
445         return startTimestamp + diff;
446     }
447 
448 }