View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import java.io.IOException;
21  import java.util.HashSet;
22  import java.util.Map;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Executors;
25  
26  import org.apache.hadoop.hbase.client.ConnectionFactory;
27  import org.apache.hadoop.hbase.client.Get;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.omid.committable.CommitTable;
31  import org.apache.omid.committable.hbase.HBaseCommitTable;
32  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
33  import org.apache.omid.tools.hbase.HBaseLogin;
34  import org.apache.omid.tso.client.CellId;
35  import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
36  import org.apache.omid.tso.client.TSOClient;
37  import org.apache.omid.tso.client.TSOProtocol;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import com.google.common.base.Optional;
42  import com.google.common.util.concurrent.ListeningExecutorService;
43  import com.google.common.util.concurrent.MoreExecutors;
44  import com.google.common.util.concurrent.ThreadFactoryBuilder;
45  
46  public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
49  
50      private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
51  
52          @Override
53          public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {
54  
55              return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(),
56                      tm, tm.isLowLatency());
57  
58          }
59  
60      }
61  
62      // ----------------------------------------------------------------------------------------------------------------
63      // Construction
64      // ----------------------------------------------------------------------------------------------------------------
65  
66      public static TransactionManager newInstance() throws IOException, InterruptedException {
67          return newInstance(new HBaseOmidClientConfiguration());
68      }
69  
70      public static TransactionManager newInstance(HBaseOmidClientConfiguration configuration)
71              throws IOException, InterruptedException {
72          //Logging in to Secure HBase if required
73          HBaseLogin.loginIfNeeded(configuration);
74          return builder(configuration).build();
75      }
76  
77      public static class Builder {
78  
79          // Required parameters
80          private final HBaseOmidClientConfiguration hbaseOmidClientConf;
81  
82          // Optional parameters - initialized to default values
83          private Optional<TSOProtocol> tsoClient = Optional.absent();
84          private Optional<CommitTable.Client> commitTableClient = Optional.absent();
85          private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
86          private Optional<PostCommitActions> postCommitter = Optional.absent();
87  
88          public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
89              this.hbaseOmidClientConf = hbaseOmidClientConf;
90          }
91  
92          public Builder tsoClient(TSOProtocol tsoClient) {
93              this.tsoClient = Optional.of(tsoClient);
94              return this;
95          }
96  
97          public Builder commitTableClient(CommitTable.Client client) {
98              this.commitTableClient = Optional.of(client);
99              return this;
100         }
101 
102         public Builder commitTableWriter(CommitTable.Writer writer) {
103             this.commitTableWriter = Optional.of(writer);
104             return this;
105         }
106 
107         Builder postCommitter(PostCommitActions postCommitter) {
108             this.postCommitter = Optional.of(postCommitter);
109             return this;
110         }
111 
112         public HBaseTransactionManager build() throws IOException, InterruptedException {
113 
114             CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
115             CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
116             PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
117             TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
118 
119             return new HBaseTransactionManager(hbaseOmidClientConf,
120                                                postCommitter,
121                                                tsoClient,
122                                                commitTableClient,
123                                                commitTableWriter,
124                                                new HBaseTransactionFactory());
125         }
126 
127         private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
128             return Optional.of((TSOProtocol) TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
129         }
130 
131 
132         private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
133             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
134             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
135             CommitTable commitTable = new HBaseCommitTable(ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration()), commitTableConf);
136             return Optional.of(commitTable.getClient());
137         }
138 
139         private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
140             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
141             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
142             CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
143             return Optional.of(commitTable.getWriter());
144         }
145 
146         private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
147 
148             PostCommitActions postCommitter;
149             PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
150                                                                              commitTableClient);
151             switch(hbaseOmidClientConf.getPostCommitMode()) {
152                 case ASYNC:
153                     ListeningExecutorService postCommitExecutor =
154                             MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
155                                     new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
156                     postCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
157                     break;
158                 case SYNC:
159                 default:
160                     postCommitter = syncPostCommitter;
161                     break;
162             }
163 
164             return Optional.of(postCommitter);
165         }
166 
167     }
168 
169     public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
170         return new Builder(hbaseOmidClientConf);
171     }
172 
173     private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConfiguration,
174                                     PostCommitActions postCommitter,
175                                     TSOProtocol tsoClient,
176                                     CommitTable.Client commitTableClient,
177                                     CommitTable.Writer commitTableWriter,
178                                     HBaseTransactionFactory hBaseTransactionFactory) {
179 
180         super(hBaseOmidClientConfiguration.getMetrics(),
181                 postCommitter,
182                 tsoClient,
183                 commitTableClient,
184                 commitTableWriter,
185                 hBaseTransactionFactory);
186     }
187 
188     // ----------------------------------------------------------------------------------------------------------------
189     // AbstractTransactionManager overwritten methods
190     // ----------------------------------------------------------------------------------------------------------------
191 
192     @Override
193     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
194         try {
195             // Flush all pending writes
196             HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
197             hBaseTx.flushTables();
198         } catch (IOException e) {
199             throw new TransactionManagerException("Exception while flushing writes", e);
200         }
201     }
202 
203     @Override
204     public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
205         try {
206             // Flush all pending writes
207             HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
208             hBaseTx.flushTables();
209         } catch (IOException e) {
210             throw new TransactionManagerException("Exception while flushing writes", e);
211         }
212     }
213 
214     @Override
215     public long getHashForTable(byte[] tableName) {
216         return HBaseCellId.getHasher().putBytes(tableName).hash().asLong();
217     }
218 
219     @Override
220     public long getLowWatermark() throws TransactionException {
221         try {
222             return commitTableClient.readLowWatermark().get();
223         } catch (ExecutionException ee) {
224             throw new TransactionException("Error reading low watermark", ee.getCause());
225         } catch (InterruptedException ie) {
226             Thread.currentThread().interrupt();
227             throw new TransactionException("Interrupted reading low watermark", ie);
228         }
229     }
230 
231     // ----------------------------------------------------------------------------------------------------------------
232     // Helper methods
233     // ----------------------------------------------------------------------------------------------------------------
234 
235     static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? extends CellId> tx) {
236 
237         if (tx instanceof HBaseTransaction) {
238             return (HBaseTransaction) tx;
239         } else {
240             throw new IllegalArgumentException(
241                     "The transaction object passed is not an instance of HBaseTransaction");
242         }
243 
244     }
245 
246     public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
247         tsoClient.setConflictDetectionLevel(conflictDetectionLevel);
248     }
249 
250     public ConflictDetectionLevel getConflictDetectionLevel() {
251         return tsoClient.getConflictDetectionLevel();
252     }
253 
254     static class CommitTimestampLocatorImpl implements CommitTimestampLocator {
255 
256         private HBaseCellId hBaseCellId;
257         private final Map<Long, Long> commitCache;
258         private TableAccessWrapper tableAccessWrapper;
259 
260         CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache, TableAccessWrapper tableAccessWrapper) {
261             this.hBaseCellId = hBaseCellId;
262             this.commitCache = commitCache;
263             this.tableAccessWrapper = tableAccessWrapper;
264         }
265 
266         CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache) {
267             this.hBaseCellId = hBaseCellId;
268             this.commitCache = commitCache;
269             this.tableAccessWrapper = null;
270             this.tableAccessWrapper = new HTableAccessWrapper(hBaseCellId.getTable().getHTable(), hBaseCellId.getTable().getHTable());
271         }
272 
273         @Override
274         public Optional<Long> readCommitTimestampFromCache(long startTimestamp) {
275             if (commitCache.containsKey(startTimestamp)) {
276                 return Optional.of(commitCache.get(startTimestamp));
277             }
278             return Optional.absent();
279         }
280 
281         @Override
282         public Optional<Long> readCommitTimestampFromShadowCell(long startTimestamp) throws IOException {
283 
284             Get get = new Get(hBaseCellId.getRow());
285             byte[] family = hBaseCellId.getFamily();
286             byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(hBaseCellId.getQualifier());
287             get.addColumn(family, shadowCellQualifier);
288             get.setMaxVersions(1);
289             get.setTimeStamp(startTimestamp);
290             Result result = tableAccessWrapper.get(get);
291             if (result.containsColumn(family, shadowCellQualifier)) {
292                 return Optional.of(Bytes.toLong(result.getValue(family, shadowCellQualifier)));
293             }
294             return Optional.absent();
295         }
296 
297     }
298 
299 }