View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import java.io.IOException;
21  import java.util.HashSet;
22  import java.util.Map;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Executors;
25  
26  import org.apache.hadoop.hbase.client.Connection;
27  import org.apache.hadoop.hbase.client.ConnectionFactory;
28  import org.apache.hadoop.hbase.client.Get;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.omid.committable.CommitTable;
32  import org.apache.omid.committable.hbase.HBaseCommitTable;
33  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
34  import org.apache.omid.tools.hbase.HBaseLogin;
35  import org.apache.omid.tso.client.CellId;
36  import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
37  import org.apache.omid.tso.client.TSOClient;
38  import org.apache.omid.tso.client.TSOProtocol;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
43  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
44  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
45  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
46  
47  public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
48  
49      private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
50      private final Connection connection;
51  
52      private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
53  
54          @Override
55          public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {
56  
57              return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(),
58                      tm, tm.isLowLatency());
59  
60          }
61  
62      }
63  
64      // ----------------------------------------------------------------------------------------------------------------
65      // Construction
66      // ----------------------------------------------------------------------------------------------------------------
67  
68      public static TransactionManager newInstance() throws IOException, InterruptedException {
69          return newInstance(new HBaseOmidClientConfiguration());
70      }
71  
72      public static TransactionManager newInstance(HBaseOmidClientConfiguration configuration)
73              throws IOException, InterruptedException {
74          //Logging in to Secure HBase if required
75          HBaseLogin.loginIfNeeded(configuration);
76          return builder(configuration).build();
77      }
78  
79      public static class Builder {
80  
81          // Required parameters
82          private final HBaseOmidClientConfiguration hbaseOmidClientConf;
83  
84          // Optional parameters - initialized to default values
85          private Optional<TSOProtocol> tsoClient = Optional.absent();
86          private Optional<CommitTable.Client> commitTableClient = Optional.absent();
87          private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
88          private Optional<PostCommitActions> postCommitter = Optional.absent();
89  
90          public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
91              this.hbaseOmidClientConf = hbaseOmidClientConf;
92          }
93  
94          public Builder tsoClient(TSOProtocol tsoClient) {
95              this.tsoClient = Optional.of(tsoClient);
96              return this;
97          }
98  
99          public Builder commitTableClient(CommitTable.Client client) {
100             this.commitTableClient = Optional.of(client);
101             return this;
102         }
103 
104         public Builder commitTableWriter(CommitTable.Writer writer) {
105             this.commitTableWriter = Optional.of(writer);
106             return this;
107         }
108 
109         Builder postCommitter(PostCommitActions postCommitter) {
110             this.postCommitter = Optional.of(postCommitter);
111             return this;
112         }
113 
114         public HBaseTransactionManager build() throws IOException, InterruptedException {
115 
116             Connection connection = ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration());
117 
118             CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient(connection)).get();
119             CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter(connection)).get();
120             PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient, connection)).get();
121             TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
122 
123             return new HBaseTransactionManager(hbaseOmidClientConf,
124                                                postCommitter,
125                                                tsoClient,
126                                                commitTableClient,
127                                                commitTableWriter,
128                                                new HBaseTransactionFactory(),
129                                                connection);
130         }
131 
132         private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
133             return Optional.of((TSOProtocol) TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
134         }
135 
136 
137         private Optional<CommitTable.Client> buildCommitTableClient(Connection connection) throws IOException {
138             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
139             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
140             CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
141             return Optional.of(commitTable.getClient());
142         }
143 
144         private Optional<CommitTable.Writer> buildCommitTableWriter(Connection connection) throws IOException {
145             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
146             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
147             CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
148             return Optional.of(commitTable.getWriter());
149         }
150 
151         private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient, Connection connection) {
152 
153             PostCommitActions postCommitter;
154             PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
155                                                                              commitTableClient, connection);
156             switch(hbaseOmidClientConf.getPostCommitMode()) {
157                 case ASYNC:
158                     ListeningExecutorService postCommitExecutor =
159                             MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
160                                     new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
161                     postCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
162                     break;
163                 case SYNC:
164                 default:
165                     postCommitter = syncPostCommitter;
166                     break;
167             }
168 
169             return Optional.of(postCommitter);
170         }
171 
172     }
173 
174     public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
175         return new Builder(hbaseOmidClientConf);
176     }
177 
178     private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConfiguration,
179                                     PostCommitActions postCommitter,
180                                     TSOProtocol tsoClient,
181                                     CommitTable.Client commitTableClient,
182                                     CommitTable.Writer commitTableWriter,
183                                     HBaseTransactionFactory hBaseTransactionFactory, Connection connection) {
184 
185         super(hBaseOmidClientConfiguration.getMetrics(),
186                 postCommitter,
187                 tsoClient,
188                 commitTableClient,
189                 commitTableWriter,
190                 hBaseTransactionFactory);
191         this.connection = connection;
192     }
193 
194     // ----------------------------------------------------------------------------------------------------------------
195     // AbstractTransactionManager overwritten methods
196     // ----------------------------------------------------------------------------------------------------------------
197     @Override
198     public void closeResources() throws IOException {
199         connection.close();
200     }
201 
202     @Override
203     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
204         try {
205             // Flush all pending writes
206             HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
207             hBaseTx.flushTables();
208         } catch (IOException e) {
209             throw new TransactionManagerException("Exception while flushing writes", e);
210         }
211     }
212 
213     @Override
214     public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
215         try {
216             // Flush all pending writes
217             HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
218             hBaseTx.flushTables();
219         } catch (IOException e) {
220             throw new TransactionManagerException("Exception while flushing writes", e);
221         }
222     }
223 
224     @Override
225     public long getHashForTable(byte[] tableName) {
226         return HBaseCellId.getHasher().putBytes(tableName).hash().asLong();
227     }
228 
229     @Override
230     public long getLowWatermark() throws TransactionException {
231         try {
232             return commitTableClient.readLowWatermark().get();
233         } catch (ExecutionException ee) {
234             throw new TransactionException("Error reading low watermark", ee.getCause());
235         } catch (InterruptedException ie) {
236             Thread.currentThread().interrupt();
237             throw new TransactionException("Interrupted reading low watermark", ie);
238         }
239     }
240 
241     // ----------------------------------------------------------------------------------------------------------------
242     // Helper methods
243     // ----------------------------------------------------------------------------------------------------------------
244 
245     static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? extends CellId> tx) {
246 
247         if (tx instanceof HBaseTransaction) {
248             return (HBaseTransaction) tx;
249         } else {
250             throw new IllegalArgumentException(
251                     "The transaction object passed is not an instance of HBaseTransaction");
252         }
253 
254     }
255 
256     public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
257         tsoClient.setConflictDetectionLevel(conflictDetectionLevel);
258     }
259 
260     public ConflictDetectionLevel getConflictDetectionLevel() {
261         return tsoClient.getConflictDetectionLevel();
262     }
263 
264     static class CommitTimestampLocatorImpl implements CommitTimestampLocator {
265 
266         private HBaseCellId hBaseCellId;
267         private final Map<Long, Long> commitCache;
268         private TableAccessWrapper tableAccessWrapper;
269 
270         CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache, TableAccessWrapper tableAccessWrapper) {
271             this.hBaseCellId = hBaseCellId;
272             this.commitCache = commitCache;
273             this.tableAccessWrapper = tableAccessWrapper;
274         }
275 
276         CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache) {
277             this.hBaseCellId = hBaseCellId;
278             this.commitCache = commitCache;
279             this.tableAccessWrapper = null;
280             this.tableAccessWrapper = new HTableAccessWrapper(hBaseCellId.getTable().getHTable(), hBaseCellId.getTable().getHTable());
281         }
282 
283         @Override
284         public Optional<Long> readCommitTimestampFromCache(long startTimestamp) {
285             if (commitCache.containsKey(startTimestamp)) {
286                 return Optional.of(commitCache.get(startTimestamp));
287             }
288             return Optional.absent();
289         }
290 
291         @Override
292         public Optional<Long> readCommitTimestampFromShadowCell(long startTimestamp) throws IOException {
293 
294             Get get = new Get(hBaseCellId.getRow());
295             byte[] family = hBaseCellId.getFamily();
296             byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(hBaseCellId.getQualifier());
297             get.addColumn(family, shadowCellQualifier);
298             get.setMaxVersions(1);
299             get.setTimeStamp(startTimestamp);
300             Result result = tableAccessWrapper.get(get);
301             if (result.containsColumn(family, shadowCellQualifier)) {
302                 return Optional.of(Bytes.toLong(result.getValue(family, shadowCellQualifier)));
303             }
304             return Optional.absent();
305         }
306 
307     }
308 
309 }