View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable.hbase;
19  
20  import com.google.common.base.Optional;
21  import com.google.common.util.concurrent.AbstractFuture;
22  import com.google.common.util.concurrent.ListenableFuture;
23  import com.google.common.util.concurrent.SettableFuture;
24  import com.google.common.util.concurrent.ThreadFactoryBuilder;
25  import com.google.protobuf.CodedInputStream;
26  import com.google.protobuf.CodedOutputStream;
27  import org.apache.omid.committable.CommitTable;
28  import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.client.Delete;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import javax.inject.Inject;
40  import java.io.IOException;
41  import java.util.ArrayList;
42  import java.util.LinkedList;
43  import java.util.List;
44  import java.util.concurrent.ArrayBlockingQueue;
45  import java.util.concurrent.BlockingQueue;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Executors;
48  import java.util.concurrent.TimeUnit;
49  
50  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_QUALIFIER;
51  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
52  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
53  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
54  
55  public class HBaseCommitTable implements CommitTable {
56  
57      private static final Logger LOG = LoggerFactory.getLogger(HBaseCommitTable.class);
58  
59      private final Configuration hbaseConfig;
60      private final String tableName;
61      private final byte[] commitTableFamily;
62      private final byte[] lowWatermarkFamily;
63      private final KeyGenerator keygen;
64  
65      /**
66       * Create a hbase commit table.
67       * Note that we do not take ownership of the passed htable, it is just used to construct the writer and client.
68       */
69      @Inject
70      public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) {
71          this(hbaseConfig, config, KeyGeneratorImplementations.defaultKeyGenerator());
72      }
73  
74      public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) {
75  
76          this.hbaseConfig = hbaseConfig;
77          this.tableName = config.getTableName();
78          this.commitTableFamily = config.getCommitTableFamily();
79          this.lowWatermarkFamily = config.getLowWatermarkFamily();
80          this.keygen = keygen;
81  
82      }
83  
84      // ----------------------------------------------------------------------------------------------------------------
85      // Reader and Writer
86      // ----------------------------------------------------------------------------------------------------------------
87  
88      private class HBaseWriter implements Writer {
89  
90          private static final long INITIAL_LWM_VALUE = -1L;
91          final HTable table;
92          // Our own buffer for operations
93          final List<Put> writeBuffer = new LinkedList<>();
94          volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
95  
96          HBaseWriter() throws IOException {
97              table = new HTable(hbaseConfig, tableName);
98          }
99  
100         @Override
101         public void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
102             assert (startTimestamp < commitTimestamp);
103             Put put = new Put(startTimestampToKey(startTimestamp), startTimestamp);
104             byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
105             put.add(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
106             writeBuffer.add(put);
107         }
108 
109         @Override
110         public void updateLowWatermark(long lowWatermark) throws IOException {
111             lowWatermarkToStore = lowWatermark;
112         }
113 
114         @Override
115         public void flush() throws IOException {
116             try {
117                 addLowWatermarkToStoreToWriteBuffer();
118                 table.put(writeBuffer);
119                 writeBuffer.clear();
120             } catch (IOException e) {
121                 LOG.error("Error flushing data", e);
122                 throw e;
123             }
124         }
125 
126         @Override
127         public void clearWriteBuffer() {
128             writeBuffer.clear();
129         }
130 
131         @Override
132         public void close() throws IOException {
133             clearWriteBuffer();
134             table.close();
135         }
136 
137         private void addLowWatermarkToStoreToWriteBuffer() {
138             long lowWatermark = lowWatermarkToStore;
139             if(lowWatermark != INITIAL_LWM_VALUE) {
140                 Put put = new Put(LOW_WATERMARK_ROW);
141                 put.add(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER, Bytes.toBytes(lowWatermark));
142                 writeBuffer.add(put);
143             }
144         }
145 
146     }
147 
148     class HBaseClient implements Client, Runnable {
149 
150         final HTable table;
151         final HTable deleteTable;
152         final ExecutorService deleteBatchExecutor;
153         final BlockingQueue<DeleteRequest> deleteQueue;
154         boolean isClosed = false; // @GuardedBy("this")
155         final static int DELETE_BATCH_SIZE = 1024;
156 
157         HBaseClient() throws IOException {
158             table = new HTable(hbaseConfig, tableName);
159             table.setAutoFlush(false, true);
160             deleteTable = new HTable(hbaseConfig, tableName);
161             deleteQueue = new ArrayBlockingQueue<>(DELETE_BATCH_SIZE);
162 
163             deleteBatchExecutor = Executors.newSingleThreadExecutor(
164                     new ThreadFactoryBuilder().setNameFormat("omid-completor-%d").build());
165             deleteBatchExecutor.submit(this);
166 
167         }
168 
169         @Override
170         public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
171 
172             SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
173             try {
174                 Get get = new Get(startTimestampToKey(startTimestamp));
175                 get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
176                 get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
177 
178                 Result result = table.get(get);
179 
180                 if (containsInvalidTransaction(result)) {
181                     CommitTimestamp invalidCT =
182                             new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false);
183                     f.set(Optional.of(invalidCT));
184                     return f;
185                 }
186 
187                 if (containsATimestamp(result)) {
188                     long commitTSValue =
189                             decodeCommitTimestamp(startTimestamp, result.getValue(commitTableFamily, COMMIT_TABLE_QUALIFIER));
190                     CommitTimestamp validCT = new CommitTimestamp(Location.COMMIT_TABLE, commitTSValue, true);
191                     f.set(Optional.of(validCT));
192                 } else {
193                     f.set(Optional.<CommitTimestamp>absent());
194                 }
195             } catch (IOException e) {
196                 LOG.error("Error getting commit timestamp for TX {}", startTimestamp, e);
197                 f.setException(e);
198             }
199             return f;
200         }
201 
202         @Override
203         public ListenableFuture<Long> readLowWatermark() {
204             SettableFuture<Long> f = SettableFuture.create();
205             try {
206                 Get get = new Get(LOW_WATERMARK_ROW);
207                 get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
208                 Result result = table.get(get);
209                 if (containsLowWatermark(result)) {
210                     long lowWatermark = Bytes.toLong(result.getValue(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
211                     f.set(lowWatermark);
212                 } else {
213                     f.set(0L);
214                 }
215             } catch (IOException e) {
216                 LOG.error("Error getting low watermark", e);
217                 f.setException(e);
218             }
219             return f;
220         }
221 
222         @Override
223         public ListenableFuture<Void> completeTransaction(long startTimestamp) {
224             try {
225                 synchronized (this) {
226 
227                     if (isClosed) {
228                         SettableFuture<Void> f = SettableFuture.create();
229                         f.setException(new IOException("Not accepting requests anymore"));
230                         return f;
231                     }
232 
233                     DeleteRequest req = new DeleteRequest(
234                             new Delete(startTimestampToKey(startTimestamp), startTimestamp));
235                     deleteQueue.put(req);
236                     return req;
237                 }
238             } catch (IOException ioe) {
239                 LOG.warn("Error generating timestamp for transaction completion", ioe);
240                 SettableFuture<Void> f = SettableFuture.create();
241                 f.setException(ioe);
242                 return f;
243             } catch (InterruptedException ie) {
244                 Thread.currentThread().interrupt();
245                 SettableFuture<Void> f = SettableFuture.create();
246                 f.setException(ie);
247                 return f;
248             }
249         }
250 
251         @Override
252         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
253             SettableFuture<Boolean> f = SettableFuture.create();
254             try {
255                 byte[] row = startTimestampToKey(startTimestamp);
256                 Put invalidationPut = new Put(row, startTimestamp);
257                 invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, null);
258 
259                 // We need to write to the invalid column only if the commit timestamp
260                 // is empty. This has to be done atomically. Otherwise, if we first
261                 // check the commit timestamp and right before the invalidation a commit
262                 // timestamp is added and read by a transaction, then snapshot isolation
263                 // might not be hold (due to the invalidation)
264                 // TODO: Decide what we should we do if we can not contact the commit table. loop till succeed???
265                 boolean result = table.checkAndPut(row, commitTableFamily, COMMIT_TABLE_QUALIFIER, null, invalidationPut);
266                 f.set(result);
267             } catch (IOException ioe) {
268                 f.setException(ioe);
269             }
270             return f;
271         }
272 
273         @Override
274         @SuppressWarnings("InfiniteLoopStatement")
275         public void run() {
276             List<DeleteRequest> reqbatch = new ArrayList<>();
277             try {
278                 while (true) {
279                     DeleteRequest r = deleteQueue.poll();
280                     if (r == null && reqbatch.size() == 0) {
281                         r = deleteQueue.take();
282                     }
283 
284                     if (r != null) {
285                         reqbatch.add(r);
286                     }
287 
288                     if (r == null || reqbatch.size() == DELETE_BATCH_SIZE) {
289                         List<Delete> deletes = new ArrayList<>();
290                         for (DeleteRequest dr : reqbatch) {
291                             deletes.add(dr.getDelete());
292                         }
293                         try {
294                             deleteTable.delete(deletes);
295                             for (DeleteRequest dr : reqbatch) {
296                                 dr.complete();
297                             }
298                         } catch (IOException ioe) {
299                             LOG.warn("Error contacting hbase", ioe);
300                             for (DeleteRequest dr : reqbatch) {
301                                 dr.error(ioe);
302                             }
303                         } finally {
304                             reqbatch.clear();
305                         }
306                     }
307                 }
308             } catch (InterruptedException ie) {
309                 // Drain the queue and place the exception in the future
310                 // for those who placed requests
311                 LOG.warn("Draining delete queue");
312                 DeleteRequest queuedRequest = deleteQueue.poll();
313                 while (queuedRequest != null) {
314                     reqbatch.add(queuedRequest);
315                     queuedRequest = deleteQueue.poll();
316                 }
317                 for (DeleteRequest dr : reqbatch) {
318                     dr.error(new IOException("HBase CommitTable is going to be closed"));
319                 }
320                 reqbatch.clear();
321                 Thread.currentThread().interrupt();
322             } catch (Throwable t) {
323                 LOG.error("Transaction completion thread threw exception", t);
324             }
325         }
326 
327         @Override
328         public synchronized void close() throws IOException {
329             isClosed = true;
330             deleteBatchExecutor.shutdownNow(); // may need to interrupt take
331             try {
332                 if (!deleteBatchExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
333                     LOG.warn("Delete executor did not shutdown");
334                 }
335             } catch (InterruptedException ie) {
336                 Thread.currentThread().interrupt();
337             }
338 
339             LOG.warn("Re-Draining delete queue just in case");
340             DeleteRequest queuedRequest = deleteQueue.poll();
341             while (queuedRequest != null) {
342                 queuedRequest.error(new IOException("HBase CommitTable is going to be closed"));
343                 queuedRequest = deleteQueue.poll();
344             }
345 
346             deleteTable.close();
347             table.close();
348         }
349 
350         private boolean containsATimestamp(Result result) {
351             return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
352         }
353 
354         private boolean containsInvalidTransaction(Result result) {
355             return (result != null && result.containsColumn(commitTableFamily, INVALID_TX_QUALIFIER));
356         }
357 
358         private boolean containsLowWatermark(Result result) {
359             return (result != null && result.containsColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER));
360         }
361 
362         private class DeleteRequest extends AbstractFuture<Void> {
363             final Delete delete;
364 
365             DeleteRequest(Delete delete) {
366                 this.delete = delete;
367             }
368 
369             void error(IOException ioe) {
370                 setException(ioe);
371             }
372 
373             void complete() {
374                 set(null);
375             }
376 
377             Delete getDelete() {
378                 return delete;
379             }
380         }
381     }
382 
383     // ----------------------------------------------------------------------------------------------------------------
384     // Getters
385     // ----------------------------------------------------------------------------------------------------------------
386 
387     @Override
388     public Writer getWriter() throws IOException {
389         return new HBaseWriter();
390     }
391 
392     @Override
393     public Client getClient() throws IOException {
394         return new HBaseClient();
395     }
396 
397     // ----------------------------------------------------------------------------------------------------------------
398     // Helper methods
399     // ----------------------------------------------------------------------------------------------------------------
400 
401     private byte[] startTimestampToKey(long startTimestamp) throws IOException {
402         return keygen.startTimestampToKey(startTimestamp);
403     }
404 
405     private static byte[] encodeCommitTimestamp(long startTimestamp, long commitTimestamp) throws IOException {
406         assert (startTimestamp < commitTimestamp);
407         long diff = commitTimestamp - startTimestamp;
408         byte[] bytes = new byte[CodedOutputStream.computeInt64SizeNoTag(diff)];
409         CodedOutputStream cos = CodedOutputStream.newInstance(bytes);
410         cos.writeInt64NoTag(diff);
411         cos.flush();
412         return bytes;
413 
414     }
415 
416     private static long decodeCommitTimestamp(long startTimestamp, byte[] encodedCommitTimestamp) throws IOException {
417         CodedInputStream cis = CodedInputStream.newInstance(encodedCommitTimestamp);
418         long diff = cis.readInt64();
419         return startTimestamp + diff;
420     }
421 
422 }