View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Function;
21  import com.google.common.base.Optional;
22  import com.google.common.hash.Hashing;
23  import com.google.common.util.concurrent.Futures;
24  
25  import org.apache.omid.committable.CommitTable;
26  import org.apache.omid.committable.CommitTable.CommitTimestamp;
27  import org.apache.omid.metrics.Counter;
28  import org.apache.omid.metrics.MetricsRegistry;
29  import org.apache.omid.metrics.Timer;
30  import org.apache.omid.transaction.Transaction.Status;
31  import org.apache.omid.tso.client.AbortException;
32  import org.apache.omid.tso.client.CellId;
33  import org.apache.omid.tso.client.ConnectionException;
34  import org.apache.omid.tso.client.ServiceUnavailableException;
35  import org.apache.omid.tso.client.TSOProtocol;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import java.io.IOException;
40  import java.util.concurrent.ExecutionException;
41  
42  
43  import static org.apache.omid.metrics.MetricsUtils.name;
44  
45  /**
46   * Omid's base abstract implementation of the {@link TransactionManager} interface.
47   *
48   * Provides extra methods to allow transaction manager developers to perform
49   * different actions before/after the methods exposed by the {@link TransactionManager} interface.
50   *
51   * So, this abstract class must be extended by particular implementations of
52   * transaction managers related to different storage systems (HBase...)
53   */
54  public abstract class AbstractTransactionManager implements TransactionManager {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionManager.class);
57  
58      public final static int MAX_CHECKPOINTS_PER_TXN = 50;
59  
60      public interface TransactionFactory<T extends CellId> {
61  
62          AbstractTransaction<T> createTransaction(long transactionId, long epoch, AbstractTransactionManager tm);
63  
64      }
65  
66      private final PostCommitActions postCommitter;
67      protected final TSOProtocol tsoClient;
68      protected final CommitTable.Client commitTableClient;
69      private final CommitTable.Writer commitTableWriter;
70      private final TransactionFactory<? extends CellId> transactionFactory;
71  
72      // Metrics
73      private final Timer startTimestampTimer;
74      private final Timer commitTimer;
75      private final Timer fenceTimer;
76      private final Counter committedTxsCounter;
77      private final Counter rolledbackTxsCounter;
78      private final Counter errorTxsCounter;
79      private final Counter invalidatedTxsCounter;
80  
81      /**
82       * Base constructor
83       *
84       * @param metrics
85       *            instrumentation metrics
86       * @param postCommitter
87       *            post commit action executor
88       * @param tsoClient
89       *            a client for accessing functionality of the status oracle
90       * @param commitTableClient
91       *            a client for accessing functionality of the commit table
92       * @param transactionFactory
93       *            a transaction factory to create the specific transaction
94       *            objects required by the transaction manager being implemented.
95       */
96      public AbstractTransactionManager(MetricsRegistry metrics,
97                                        PostCommitActions postCommitter,
98                                        TSOProtocol tsoClient,
99                                        CommitTable.Client commitTableClient,
100                                       CommitTable.Writer commitTableWriter,
101                                       TransactionFactory<? extends CellId> transactionFactory) {
102 
103         this.tsoClient = tsoClient;
104         this.postCommitter = postCommitter;
105         this.commitTableClient = commitTableClient;
106         this.commitTableWriter = commitTableWriter;
107         this.transactionFactory = transactionFactory;
108 
109         // Metrics configuration
110         this.startTimestampTimer = metrics.timer(name("omid", "tm", "hbase", "startTimestamp", "latency"));
111         this.commitTimer = metrics.timer(name("omid", "tm", "hbase", "commit", "latency"));
112         this.fenceTimer = metrics.timer(name("omid", "tm", "hbase", "fence", "latency"));
113         this.committedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "committedTxs"));
114         this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
115         this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
116         this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs"));
117 
118     }
119 
120     /**
121      * Allows transaction manager developers to perform actions before creating a transaction.
122      * @throws TransactionManagerException in case of any issues
123      */
124     public void preBegin() throws TransactionManagerException {}
125 
126     /**
127      * @see org.apache.omid.transaction.TransactionManager#begin()
128      */
129     @Override
130     public final Transaction begin() throws TransactionException {
131 
132         try {
133             preBegin();
134 
135             long startTimestamp, epoch;
136 
137             // The loop is required for HA scenarios where we get the timestamp
138             // but when getting the epoch, the client is connected to a new TSOServer
139             // When this happen, the epoch will be larger than the startTimestamp,
140             // so we need to start the transaction again. We use the fact that epoch
141             // is always smaller or equal to a timestamp, and therefore, we first need
142             // to get the timestamp and then the epoch.
143             startTimestampTimer.start();
144             try {
145                 do {
146                     startTimestamp = tsoClient.getNewStartTimestamp().get();
147                     epoch = tsoClient.getEpoch();
148                 } while (epoch > startTimestamp);
149             } finally {
150                 startTimestampTimer.stop();
151             }
152 
153             AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(startTimestamp, epoch, this);
154 
155             postBegin(tx);
156 
157             return tx;
158         } catch (TransactionManagerException e) {
159             throw new TransactionException("An error has occured during PreBegin/PostBegin", e);
160         } catch (ExecutionException e) {
161             throw new TransactionException("Could not get new timestamp", e);
162         } catch (InterruptedException ie) {
163             Thread.currentThread().interrupt();
164             throw new TransactionException("Interrupted getting timestamp", ie);
165         }
166     }
167 
168     /**
169      * Generates hash ID for table name, this hash is later-on sent to the TSO and used for fencing
170      * @param tableName - the table name
171      * @return
172      */
173     abstract public long getHashForTable(byte[] tableName);
174 
175     /**
176      * Return the commit table client
177      * @return commitTableClient
178      */
179     public CommitTable.Client getCommitTableClient() {
180         return commitTableClient;
181     }
182 
183     /**
184      * @see org.apache.omid.transaction.TransactionManager#fence(byte[])
185      */
186     @Override
187     public final Transaction fence(byte[] tableName) throws TransactionException {
188         long fenceTimestamp;
189         long tableID = getHashForTable(tableName); Hashing.murmur3_128().newHasher().putBytes(tableName).hash().asLong();
190 
191         try {
192             fenceTimer.start();
193             try {
194                 fenceTimestamp = tsoClient.getFence(tableID).get();
195             } finally {
196                 fenceTimer.stop();
197             }
198 
199             AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(fenceTimestamp, fenceTimestamp, this);
200 
201             return tx;
202         } catch (ExecutionException e) {
203             throw new TransactionException("Could not get fence", e);
204         } catch (InterruptedException ie) {
205             Thread.currentThread().interrupt();
206             throw new TransactionException("Interrupted creating a fence", ie);
207         }
208     }
209 
210     /**
211      * Allows transaction manager developers to perform actions after having started a transaction.
212      * @param transaction
213      *            the transaction that was just created.
214      * @throws TransactionManagerException  in case of any issues
215      */
216     public void postBegin(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
217 
218     /**
219      * Allows transaction manager developers to perform actions before committing a transaction.
220      * @param transaction
221      *            the transaction that is going to be committed.
222      * @throws TransactionManagerException  in case of any issues
223      */
224     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
225 
226     /**
227      * @see org.apache.omid.transaction.TransactionManager#commit(Transaction)
228      */
229     @Override
230     public final void commit(Transaction transaction) throws RollbackException, TransactionException {
231 
232         AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
233         enforceTransactionIsInRunningState(tx);
234 
235         if (tx.isRollbackOnly()) { // Manage explicit user rollback
236             rollback(tx);
237             throw new RollbackException(tx + ": Tx was set to rollback explicitly");
238         }
239 
240         try {
241 
242             preCommit(tx);
243 
244             commitTimer.start();
245             try {
246                 if (tx.getWriteSet().isEmpty() && tx.getConflictFreeWriteSet().isEmpty()) {
247                     markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server
248                 } else {
249                     if (tsoClient.isLowLatency())
250                         commitLowLatencyTransaction(tx);
251                     else
252                         commitRegularTransaction(tx);
253                 }
254                 committedTxsCounter.inc();
255             } finally {
256                 commitTimer.stop();
257             }
258 
259             postCommit(tx);
260 
261         } catch (TransactionManagerException e) {
262             throw new TransactionException(e.getMessage(), e);
263         }
264 
265     }
266 
267     /**
268      * Allows transaction manager developers to perform actions after committing a transaction.
269      * @param transaction
270      *            the transaction that was committed.
271      * @throws TransactionManagerException in case of any issues
272      */
273     public void postCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
274 
275     /**
276      * Allows transaction manager developers to perform actions before rolling-back a transaction.
277      * @param transaction the transaction that is going to be rolled-back.
278      * @throws TransactionManagerException in case of any issues
279      */
280     public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
281 
282     /**
283      * @see org.apache.omid.transaction.TransactionManager#rollback(Transaction)
284      */
285     @Override
286     public final void rollback(Transaction transaction) throws TransactionException {
287 
288         AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
289         enforceTransactionIsInRunningState(tx);
290 
291         try {
292 
293             preRollback(tx);
294 
295             // Make sure its commit timestamp is 0, so the cleanup does the right job
296             tx.setCommitTimestamp(0);
297             tx.setStatus(Status.ROLLEDBACK);
298 
299             postRollback(tx);
300 
301         } catch (TransactionManagerException e) {
302             throw new TransactionException(e.getMessage(), e);
303         } finally {
304             tx.cleanup();
305         }
306 
307     }
308 
309     /**
310      * Allows transaction manager developers to perform actions after rolling-back a transaction.
311      * @param transaction
312      *            the transaction that was rolled-back.
313      * @throws TransactionManagerException in case of any issues
314      */
315     public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
316 
317     /**
318      * @see java.io.Closeable#close()
319      */
320     @Override
321     public final void close() throws IOException {
322 
323         tsoClient.close();
324         commitTableClient.close();
325 
326     }
327 
328     // ----------------------------------------------------------------------------------------------------------------
329     // Helper methods
330     // ----------------------------------------------------------------------------------------------------------------
331 
332     private void enforceTransactionIsInRunningState(Transaction transaction) {
333 
334         if (transaction.getStatus() != Status.RUNNING) {
335             throw new IllegalArgumentException("Transaction was already " + transaction.getStatus());
336         }
337 
338     }
339 
340     @SuppressWarnings("unchecked")
341     // NOTE: We are sure that tx is not parametrized
342     private AbstractTransaction<? extends CellId> enforceAbstractTransactionAsParam(Transaction tx) {
343 
344         if (tx instanceof AbstractTransaction) {
345             return (AbstractTransaction<? extends CellId>) tx;
346         } else {
347             throw new IllegalArgumentException(
348                     "The transaction object passed is not an instance of AbstractTransaction");
349         }
350 
351     }
352 
353     private void markReadOnlyTransaction(AbstractTransaction<? extends CellId> readOnlyTx) {
354 
355         readOnlyTx.setStatus(Status.COMMITTED_RO);
356 
357     }
358 
359     private void commitLowLatencyTransaction(AbstractTransaction<? extends CellId> tx)
360             throws RollbackException, TransactionException {
361         try {
362 
363             long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
364             boolean committed = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs);
365             if (!committed) {
366                 // Transaction has been invalidated by other client
367                 rollback(tx);
368                 commitTableClient.completeTransaction(tx.getStartTimestamp());
369                 rolledbackTxsCounter.inc();
370                 throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
371             }
372             certifyCommitForTx(tx, commitTs);
373             updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
374 
375         } catch (ExecutionException e) {
376             if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
377                 rollback(tx);
378                 rolledbackTxsCounter.inc();
379                 throw new RollbackException("Conflicts detected in tx writeset", e.getCause());
380             }
381 
382             if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
383                 errorTxsCounter.inc();
384                 rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
385                 throw new RollbackException(tx + " rolled-back precautionary", e.getCause());
386             } else {
387                 throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause());
388             }
389         } catch (InterruptedException e) {
390             e.printStackTrace();
391         } catch (IOException e) {
392             e.printStackTrace();
393         }
394     }
395 
396     private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
397             throws RollbackException, TransactionException
398     {
399 
400         try {
401 
402             long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
403             certifyCommitForTx(tx, commitTs);
404             updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
405 
406         } catch (ExecutionException e) {
407 
408             if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
409                 rollback(tx);
410                 rolledbackTxsCounter.inc();
411                 throw new RollbackException(tx + ": Conflicts detected in writeset", e.getCause());
412             }
413 
414             if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
415 
416                 errorTxsCounter.inc();
417                 try {
418                     LOG.warn("Can't contact the TSO for receiving outcome for Tx {}. Checking Commit Table...", tx);
419                     // Check the commit table to find if the target TSO woke up in the meantime and added the commit
420                     // TODO: Decide what we should we do if we can not contact the commit table
421                     Optional<CommitTimestamp> commitTimestamp =
422                             commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
423                     if (commitTimestamp.isPresent()) {
424                         if (commitTimestamp.get().isValid()) {
425                             LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx);
426                             certifyCommitForTx(tx, commitTimestamp.get().getValue());
427                             postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
428                         } else { // Probably another Tx in a new TSO Server invalidated this transaction
429                             LOG.warn("{}: Invalidated commit TS found in Commit Table. Rolling-back...", tx);
430                             rollback(tx);
431                             throw new RollbackException(tx + " invalidated by other Tx started", e.getCause());
432                         }
433                     } else {
434                         LOG.warn("{}: Trying to invalidate Tx proactively in Commit Table...", tx);
435                         boolean invalidated = commitTableClient.tryInvalidateTransaction(tx.getStartTimestamp()).get();
436                         if (invalidated) {
437                             LOG.warn("{}: Invalidated proactively in Commit Table. Rolling-back Tx...", tx);
438                             invalidatedTxsCounter.inc();
439                             rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
440                             throw new RollbackException(tx + " rolled-back precautionary", e.getCause());
441                         } else {
442                             LOG.warn("{}: Invalidation could NOT be completed. Re-checking Commit Table...", tx);
443                             // TODO: Decide what we should we do if we can not contact the commit table
444                             commitTimestamp = commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
445                             if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
446                                 LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx);
447                                 certifyCommitForTx(tx, commitTimestamp.get().getValue());
448                                 postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
449                             } else {
450                                 LOG.error("{}: Can't determine Transaction outcome", tx);
451                                 throw new TransactionException(tx + ": cannot determine Tx outcome");
452                             }
453                         }
454                     }
455                 } catch (ExecutionException e1) {
456                     throw new TransactionException(tx + ": problem reading commitTS from Commit Table", e1);
457                 } catch (InterruptedException e1) {
458                     Thread.currentThread().interrupt();
459                     throw new TransactionException(tx + ": interrupted while reading commitTS from Commit Table", e1);
460                 }
461             } else {
462                 throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause());
463             }
464         } catch (InterruptedException ie) {
465             Thread.currentThread().interrupt();
466             throw new TransactionException(tx + ": interrupted during commit", ie);
467 
468         }
469 
470     }
471 
472     private void updateShadowCellsAndRemoveCommitTableEntry(final AbstractTransaction<? extends CellId> tx,
473                                                             final PostCommitActions postCommitter) {
474 
475         Futures.transform(postCommitter.updateShadowCells(tx), new Function<Void, Void>() {
476             @Override
477             public Void apply(Void aVoid) {
478                 postCommitter.removeCommitTableEntry(tx);
479                 return null;
480             }
481         });
482 
483     }
484 
485     private void certifyCommitForTx(AbstractTransaction<? extends CellId> txToSetup, long commitTS) {
486 
487         txToSetup.setStatus(Status.COMMITTED);
488         txToSetup.setCommitTimestamp(commitTS);
489 
490     }
491 
492     public boolean isLowLatency() {
493         return tsoClient.isLowLatency();
494     }
495 }