View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.base.Optional;
22  import com.google.common.collect.Maps;
23  import com.google.common.util.concurrent.ListeningExecutorService;
24  import com.google.common.util.concurrent.MoreExecutors;
25  import com.google.common.util.concurrent.ThreadFactoryBuilder;
26  import org.apache.omid.committable.CommitTable;
27  import org.apache.omid.committable.CommitTable.CommitTimestamp;
28  import org.apache.omid.committable.hbase.HBaseCommitTable;
29  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
30  import org.apache.omid.tools.hbase.HBaseLogin;
31  import org.apache.omid.tso.client.CellId;
32  import org.apache.omid.tso.client.TSOClient;
33  import org.apache.hadoop.hbase.client.Get;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import java.io.IOException;
40  import java.util.HashSet;
41  import java.util.Map;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.Executors;
44  
45  public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
46  
47      private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
48  
49      private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
50  
51          @Override
52          public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {
53  
54              return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), tm);
55  
56          }
57  
58      }
59  
60      // ----------------------------------------------------------------------------------------------------------------
61      // Construction
62      // ----------------------------------------------------------------------------------------------------------------
63  
64      public static TransactionManager newInstance() throws IOException, InterruptedException {
65          return newInstance(new HBaseOmidClientConfiguration());
66      }
67  
68      public static TransactionManager newInstance(HBaseOmidClientConfiguration configuration)
69              throws IOException, InterruptedException {
70          //Logging in to Secure HBase if required
71          HBaseLogin.loginIfNeeded(configuration);
72          return builder(configuration).build();
73      }
74  
75      @VisibleForTesting
76      static class Builder {
77  
78          // Required parameters
79          private final HBaseOmidClientConfiguration hbaseOmidClientConf;
80  
81          // Optional parameters - initialized to default values
82          private Optional<TSOClient> tsoClient = Optional.absent();
83          private Optional<CommitTable.Client> commitTableClient = Optional.absent();
84          private Optional<PostCommitActions> postCommitter = Optional.absent();
85  
86          private Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
87              this.hbaseOmidClientConf = hbaseOmidClientConf;
88          }
89  
90          Builder tsoClient(TSOClient tsoClient) {
91              this.tsoClient = Optional.of(tsoClient);
92              return this;
93          }
94  
95          Builder commitTableClient(CommitTable.Client client) {
96              this.commitTableClient = Optional.of(client);
97              return this;
98          }
99  
100         Builder postCommitter(PostCommitActions postCommitter) {
101             this.postCommitter = Optional.of(postCommitter);
102             return this;
103         }
104 
105         HBaseTransactionManager build() throws IOException, InterruptedException {
106 
107             CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
108             PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
109             TSOClient tsoClient = this.tsoClient.or(buildTSOClient()).get();
110 
111             return new HBaseTransactionManager(hbaseOmidClientConf,
112                                                postCommitter,
113                                                tsoClient,
114                                                commitTableClient,
115                                                new HBaseTransactionFactory());
116         }
117 
118         private Optional<TSOClient> buildTSOClient() throws IOException, InterruptedException {
119             return Optional.of(TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
120         }
121 
122 
123         private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
124             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
125             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
126             CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
127             return Optional.of(commitTable.getClient());
128         }
129 
130         private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
131 
132             PostCommitActions postCommitter;
133             PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
134                                                                              commitTableClient);
135             switch(hbaseOmidClientConf.getPostCommitMode()) {
136                 case ASYNC:
137                     ListeningExecutorService postCommitExecutor =
138                             MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
139                                     new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
140                     postCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
141                     break;
142                 case SYNC:
143                 default:
144                     postCommitter = syncPostCommitter;
145                     break;
146             }
147 
148             return Optional.of(postCommitter);
149         }
150 
151     }
152 
153     @VisibleForTesting
154     static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
155         return new Builder(hbaseOmidClientConf);
156     }
157 
158     private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConfiguration,
159                                     PostCommitActions postCommitter,
160                                     TSOClient tsoClient,
161                                     CommitTable.Client commitTableClient,
162                                     HBaseTransactionFactory hBaseTransactionFactory) {
163 
164         super(hBaseOmidClientConfiguration.getMetrics(),
165               postCommitter,
166               tsoClient,
167               commitTableClient,
168               hBaseTransactionFactory);
169 
170     }
171 
172     // ----------------------------------------------------------------------------------------------------------------
173     // AbstractTransactionManager overwritten methods
174     // ----------------------------------------------------------------------------------------------------------------
175 
176     @Override
177     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
178         try {
179             // Flush all pending writes
180             HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
181             hBaseTx.flushTables();
182         } catch (IOException e) {
183             throw new TransactionManagerException("Exception while flushing writes", e);
184         }
185     }
186 
187     @Override
188     public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
189         try {
190             // Flush all pending writes
191             HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
192             hBaseTx.flushTables();
193         } catch (IOException e) {
194             throw new TransactionManagerException("Exception while flushing writes", e);
195         }
196     }
197 
198     // ----------------------------------------------------------------------------------------------------------------
199     // HBaseTransactionClient method implementations
200     // ----------------------------------------------------------------------------------------------------------------
201 
202     @Override
203     public boolean isCommitted(HBaseCellId hBaseCellId) throws TransactionException {
204         try {
205             CommitTimestamp tentativeCommitTimestamp =
206                     locateCellCommitTimestamp(hBaseCellId.getTimestamp(), tsoClient.getEpoch(),
207                                               new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()));
208 
209             // If transaction that added the cell was invalidated
210             if (!tentativeCommitTimestamp.isValid()) {
211                 return false;
212             }
213 
214             switch (tentativeCommitTimestamp.getLocation()) {
215                 case COMMIT_TABLE:
216                 case SHADOW_CELL:
217                     return true;
218                 case NOT_PRESENT:
219                     return false;
220                 case CACHE: // cache was empty
221                 default:
222                     return false;
223             }
224         } catch (IOException e) {
225             throw new TransactionException("Failure while checking if a transaction was committed", e);
226         }
227     }
228 
229     @Override
230     public long getLowWatermark() throws TransactionException {
231         try {
232             return commitTableClient.readLowWatermark().get();
233         } catch (ExecutionException ee) {
234             throw new TransactionException("Error reading low watermark", ee.getCause());
235         } catch (InterruptedException ie) {
236             Thread.currentThread().interrupt();
237             throw new TransactionException("Interrupted reading low watermark", ie);
238         }
239     }
240 
241     // ----------------------------------------------------------------------------------------------------------------
242     // Helper methods
243     // ----------------------------------------------------------------------------------------------------------------
244 
245     static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? extends CellId> tx) {
246 
247         if (tx instanceof HBaseTransaction) {
248             return (HBaseTransaction) tx;
249         } else {
250             throw new IllegalArgumentException(
251                     "The transaction object passed is not an instance of HBaseTransaction");
252         }
253 
254     }
255 
256     static class CommitTimestampLocatorImpl implements CommitTimestampLocator {
257 
258         private HBaseCellId hBaseCellId;
259         private final Map<Long, Long> commitCache;
260 
261         CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache) {
262             this.hBaseCellId = hBaseCellId;
263             this.commitCache = commitCache;
264         }
265 
266         @Override
267         public Optional<Long> readCommitTimestampFromCache(long startTimestamp) {
268             if (commitCache.containsKey(startTimestamp)) {
269                 return Optional.of(commitCache.get(startTimestamp));
270             }
271             return Optional.absent();
272         }
273 
274         @Override
275         public Optional<Long> readCommitTimestampFromShadowCell(long startTimestamp) throws IOException {
276 
277             Get get = new Get(hBaseCellId.getRow());
278             byte[] family = hBaseCellId.getFamily();
279             byte[] shadowCellQualifier = CellUtils.addShadowCellSuffix(hBaseCellId.getQualifier());
280             get.addColumn(family, shadowCellQualifier);
281             get.setMaxVersions(1);
282             get.setTimeStamp(startTimestamp);
283             Result result = hBaseCellId.getTable().get(get);
284             if (result.containsColumn(family, shadowCellQualifier)) {
285                 return Optional.of(Bytes.toLong(result.getValue(family, shadowCellQualifier)));
286             }
287             return Optional.absent();
288         }
289 
290     }
291 
292 }