View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Function;
21  import com.google.common.base.Optional;
22  import com.google.common.util.concurrent.Futures;
23  import org.apache.omid.committable.CommitTable;
24  import org.apache.omid.committable.CommitTable.CommitTimestamp;
25  import org.apache.omid.metrics.Counter;
26  import org.apache.omid.metrics.MetricsRegistry;
27  import org.apache.omid.metrics.Timer;
28  import org.apache.omid.transaction.Transaction.Status;
29  import org.apache.omid.tso.client.AbortException;
30  import org.apache.omid.tso.client.CellId;
31  import org.apache.omid.tso.client.ConnectionException;
32  import org.apache.omid.tso.client.ServiceUnavailableException;
33  import org.apache.omid.tso.client.TSOClient;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import java.io.IOException;
38  import java.util.concurrent.ExecutionException;
39  
40  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
41  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
42  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
43  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
44  import static org.apache.omid.metrics.MetricsUtils.name;
45  
46  /**
47   * Omid's base abstract implementation of the {@link TransactionManager} interface.
48   *
49   * Provides extra methods to allow transaction manager developers to perform
50   * different actions before/after the methods exposed by the {@link TransactionManager} interface.
51   *
52   * So, this abstract class must be extended by particular implementations of
53   * transaction managers related to different storage systems (HBase...)
54   */
55  public abstract class AbstractTransactionManager implements TransactionManager {
56  
57      private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionManager.class);
58  
59      public interface TransactionFactory<T extends CellId> {
60  
61          AbstractTransaction<T> createTransaction(long transactionId, long epoch, AbstractTransactionManager tm);
62  
63      }
64  
65      private final PostCommitActions postCommitter;
66      protected final TSOClient tsoClient;
67      protected final CommitTable.Client commitTableClient;
68      private final TransactionFactory<? extends CellId> transactionFactory;
69  
70      // Metrics
71      private final Timer startTimestampTimer;
72      private final Timer commitTimer;
73      private final Counter committedTxsCounter;
74      private final Counter rolledbackTxsCounter;
75      private final Counter errorTxsCounter;
76      private final Counter invalidatedTxsCounter;
77  
78      /**
79       * Base constructor
80       *
81       * @param metrics
82       *            instrumentation metrics
83       * @param postCommitter
84       *            post commit action executor
85       * @param tsoClient
86       *            a client for accessing functionality of the status oracle
87       * @param commitTableClient
88       *            a client for accessing functionality of the commit table
89       * @param transactionFactory
90       *            a transaction factory to create the specific transaction
91       *            objects required by the transaction manager being implemented.
92       */
93      public AbstractTransactionManager(MetricsRegistry metrics,
94                                        PostCommitActions postCommitter,
95                                        TSOClient tsoClient,
96                                        CommitTable.Client commitTableClient,
97                                        TransactionFactory<? extends CellId> transactionFactory) {
98  
99          this.tsoClient = tsoClient;
100         this.postCommitter = postCommitter;
101         this.commitTableClient = commitTableClient;
102         this.transactionFactory = transactionFactory;
103 
104         // Metrics configuration
105         this.startTimestampTimer = metrics.timer(name("omid", "tm", "hbase", "startTimestamp", "latency"));
106         this.commitTimer = metrics.timer(name("omid", "tm", "hbase", "commit", "latency"));
107         this.committedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "committedTxs"));
108         this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
109         this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
110         this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs"));
111 
112     }
113 
114     /**
115      * Allows transaction manager developers to perform actions before creating a transaction.
116      * @throws TransactionManagerException
117      */
118     public void preBegin() throws TransactionManagerException {}
119 
120     /**
121      * @see org.apache.omid.transaction.TransactionManager#begin()
122      */
123     @Override
124     public final Transaction begin() throws TransactionException {
125 
126         try {
127             preBegin();
128 
129             long startTimestamp, epoch;
130 
131             // The loop is required for HA scenarios where we get the timestamp
132             // but when getting the epoch, the client is connected to a new TSOServer
133             // When this happen, the epoch will be larger than the startTimestamp,
134             // so we need to start the transaction again. We use the fact that epoch
135             // is always smaller or equal to a timestamp, and therefore, we first need
136             // to get the timestamp and then the epoch.
137             startTimestampTimer.start();
138             try {
139                 do {
140                     startTimestamp = tsoClient.getNewStartTimestamp().get();
141                     epoch = tsoClient.getEpoch();
142                 } while (epoch > startTimestamp);
143             } finally {
144                 startTimestampTimer.stop();
145             }
146 
147             AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(startTimestamp, epoch, this);
148 
149             postBegin(tx);
150 
151             return tx;
152         } catch (TransactionManagerException e) {
153             throw new TransactionException("An error has occured during PreBegin/PostBegin", e);
154         } catch (ExecutionException e) {
155             throw new TransactionException("Could not get new timestamp", e);
156         } catch (InterruptedException ie) {
157             Thread.currentThread().interrupt();
158             throw new TransactionException("Interrupted getting timestamp", ie);
159         }
160     }
161 
162     /**
163      * Allows transaction manager developers to perform actions after having started a transaction.
164      * @param transaction
165      *            the transaction that was just created.
166      * @throws TransactionManagerException
167      */
168     public void postBegin(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
169 
170     /**
171      * Allows transaction manager developers to perform actions before committing a transaction.
172      * @param transaction
173      *            the transaction that is going to be committed.
174      * @throws TransactionManagerException
175      */
176     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
177 
178     /**
179      * @see org.apache.omid.transaction.TransactionManager#commit(Transaction)
180      */
181     @Override
182     public final void commit(Transaction transaction) throws RollbackException, TransactionException {
183 
184         AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
185         enforceTransactionIsInRunningState(tx);
186 
187         if (tx.isRollbackOnly()) { // Manage explicit user rollback
188             rollback(tx);
189             throw new RollbackException(tx + ": Tx was set to rollback explicitly");
190         }
191 
192         try {
193 
194             preCommit(tx);
195 
196             commitTimer.start();
197             try {
198                 if (tx.getWriteSet().isEmpty()) {
199                     markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server
200                 } else {
201                     commitRegularTransaction(tx);
202                 }
203                 committedTxsCounter.inc();
204             } finally {
205                 commitTimer.stop();
206             }
207 
208             postCommit(tx);
209 
210         } catch (TransactionManagerException e) {
211             throw new TransactionException(e.getMessage(), e);
212         }
213 
214     }
215 
216     /**
217      * Allows transaction manager developers to perform actions after committing a transaction.
218      * @param transaction
219      *            the transaction that was committed.
220      * @throws TransactionManagerException
221      */
222     public void postCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
223 
224     /**
225      * Allows transaction manager developers to perform actions before rolling-back a transaction.
226      * @param transaction
227      *            the transaction that is going to be rolled-back.
228      * @throws TransactionManagerException
229      */
230     public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
231 
232     /**
233      * @see org.apache.omid.transaction.TransactionManager#rollback(Transaction)
234      */
235     @Override
236     public final void rollback(Transaction transaction) throws TransactionException {
237 
238         AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
239         enforceTransactionIsInRunningState(tx);
240 
241         try {
242 
243             preRollback(tx);
244 
245             // Make sure its commit timestamp is 0, so the cleanup does the right job
246             tx.setCommitTimestamp(0);
247             tx.setStatus(Status.ROLLEDBACK);
248 
249             postRollback(tx);
250 
251         } catch (TransactionManagerException e) {
252             throw new TransactionException(e.getMessage(), e);
253         } finally {
254             tx.cleanup();
255         }
256 
257     }
258 
259     /**
260      * Allows transaction manager developers to perform actions after rolling-back a transaction.
261      * @param transaction
262      *            the transaction that was rolled-back.
263      * @throws TransactionManagerException
264      */
265     public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
266 
267     /**
268      * Check if the transaction commit data is in the shadow cell
269      * @param cellStartTimestamp
270      *            the transaction start timestamp
271      *        locator
272      *            the timestamp locator
273      * @throws IOException
274      */
275     Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
276             throws IOException
277     {
278 
279         Optional<CommitTimestamp> commitTS = Optional.absent();
280 
281         Optional<Long> commitTimestamp = locator.readCommitTimestampFromShadowCell(cellStartTimestamp);
282         if (commitTimestamp.isPresent()) {
283             commitTS = Optional.of(new CommitTimestamp(SHADOW_CELL, commitTimestamp.get(), true)); // Valid commit TS
284         }
285 
286         return commitTS;
287     }
288 
289     /**
290      * This function returns the commit timestamp for a particular cell if the transaction was already committed in
291      * the system. In case the transaction was not committed and the cell was written by transaction initialized by a
292      * previous TSO server, an invalidation try occurs.
293      * Otherwise the function returns a value that indicates that the commit timestamp was not found.
294      * @param cellStartTimestamp
295      *          start timestamp of the cell to locate the commit timestamp for.
296      * @param epoch
297      *          the epoch of the TSO server the current tso client is working with.
298      * @param locator
299      *          a locator to find the commit timestamp in the system.
300      * @return the commit timestamp joint with the location where it was found
301      *         or an object indicating that it was not found in the system
302      * @throws IOException
303      */
304     public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
305                                                      CommitTimestampLocator locator) throws IOException {
306 
307         try {
308             // 1) First check the cache
309             Optional<Long> commitTimestamp = locator.readCommitTimestampFromCache(cellStartTimestamp);
310             if (commitTimestamp.isPresent()) { // Valid commit timestamp
311                 return new CommitTimestamp(CACHE, commitTimestamp.get(), true);
312             }
313 
314             // 2) Then check the commit table
315             // If the data was written at a previous epoch, check whether the transaction was invalidated
316             Optional<CommitTimestamp> commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
317             if (commitTimeStamp.isPresent()) {
318                 return commitTimeStamp.get();
319             }
320 
321             // 3) Read from shadow cell
322             commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
323             if (commitTimeStamp.isPresent()) {
324                 return commitTimeStamp.get();
325             }
326 
327             // 4) Check the epoch and invalidate the entry
328             // if the data was written by a transaction from a previous epoch (previous TSO)
329             if (cellStartTimestamp < epoch) {
330                 boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
331                 if (invalidated) { // Invalid commit timestamp
332                     return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
333                 }
334             }
335 
336             // 5) We did not manage to invalidate the transactions then check the commit table
337             commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
338             if (commitTimeStamp.isPresent()) {
339                 return commitTimeStamp.get();
340             }
341 
342             // 6) Read from shadow cell
343             commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
344             if (commitTimeStamp.isPresent()) {
345                 return commitTimeStamp.get();
346             }
347 
348             // *) Otherwise return not found
349             return new CommitTimestamp(NOT_PRESENT, -1L /** TODO Check if we should return this */, true);
350         } catch (InterruptedException e) {
351             Thread.currentThread().interrupt();
352             throw new IOException("Interrupted while finding commit timestamp", e);
353         } catch (ExecutionException e) {
354             throw new IOException("Problem finding commit timestamp", e);
355         }
356 
357     }
358 
359     /**
360      * @see java.io.Closeable#close()
361      */
362     @Override
363     public final void close() throws IOException {
364 
365         tsoClient.close();
366         commitTableClient.close();
367 
368     }
369 
370     // ----------------------------------------------------------------------------------------------------------------
371     // Helper methods
372     // ----------------------------------------------------------------------------------------------------------------
373 
374     private void enforceTransactionIsInRunningState(Transaction transaction) {
375 
376         if (transaction.getStatus() != Status.RUNNING) {
377             throw new IllegalArgumentException("Transaction was already " + transaction.getStatus());
378         }
379 
380     }
381 
382     @SuppressWarnings("unchecked")
383     // NOTE: We are sure that tx is not parametrized
384     private AbstractTransaction<? extends CellId> enforceAbstractTransactionAsParam(Transaction tx) {
385 
386         if (tx instanceof AbstractTransaction) {
387             return (AbstractTransaction<? extends CellId>) tx;
388         } else {
389             throw new IllegalArgumentException(
390                     "The transaction object passed is not an instance of AbstractTransaction");
391         }
392 
393     }
394 
395     private void markReadOnlyTransaction(AbstractTransaction<? extends CellId> readOnlyTx) {
396 
397         readOnlyTx.setStatus(Status.COMMITTED_RO);
398 
399     }
400 
401     private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
402             throws RollbackException, TransactionException
403     {
404 
405         try {
406 
407             long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet()).get();
408             certifyCommitForTx(tx, commitTs);
409             updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
410 
411         } catch (ExecutionException e) {
412 
413             if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
414                 rollback(tx);
415                 rolledbackTxsCounter.inc();
416                 throw new RollbackException("Conflicts detected in tx writeset", e.getCause());
417             }
418 
419             if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
420 
421                 errorTxsCounter.inc();
422                 try {
423                     LOG.warn("Can't contact the TSO for receiving outcome for Tx {}. Checking Commit Table...", tx);
424                     // Check the commit table to find if the target TSO woke up in the meantime and added the commit
425                     // TODO: Decide what we should we do if we can not contact the commit table
426                     Optional<CommitTimestamp> commitTimestamp =
427                             commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
428                     if (commitTimestamp.isPresent()) {
429                         if (commitTimestamp.get().isValid()) {
430                             LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx);
431                             certifyCommitForTx(tx, commitTimestamp.get().getValue());
432                             postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
433                         } else { // Probably another Tx in a new TSO Server invalidated this transaction
434                             LOG.warn("{}: Invalidated commit TS found in Commit Table. Rolling-back...", tx);
435                             rollback(tx);
436                             throw new RollbackException(tx + " invalidated by other Tx started", e.getCause());
437                         }
438                     } else {
439                         LOG.warn("{}: Trying to invalidate Tx proactively in Commit Table...", tx);
440                         boolean invalidated = commitTableClient.tryInvalidateTransaction(tx.getStartTimestamp()).get();
441                         if (invalidated) {
442                             LOG.warn("{}: Invalidated proactively in Commit Table. Rolling-back Tx...", tx);
443                             invalidatedTxsCounter.inc();
444                             rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
445                             throw new RollbackException(tx + " rolled-back precautionary", e.getCause());
446                         } else {
447                             LOG.warn("{}: Invalidation could NOT be completed. Re-checking Commit Table...", tx);
448                             // TODO: Decide what we should we do if we can not contact the commit table
449                             commitTimestamp = commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
450                             if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
451                                 LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx);
452                                 certifyCommitForTx(tx, commitTimestamp.get().getValue());
453                                 postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
454                             } else {
455                                 LOG.error("{}: Can't determine Transaction outcome", tx);
456                                 throw new TransactionException(tx + ": cannot determine Tx outcome");
457                             }
458                         }
459                     }
460                 } catch (ExecutionException e1) {
461                     throw new TransactionException(tx + ": problem reading commitTS from Commit Table", e1);
462                 } catch (InterruptedException e1) {
463                     Thread.currentThread().interrupt();
464                     throw new TransactionException(tx + ": interrupted while reading commitTS from Commit Table", e1);
465                 }
466             } else {
467                 throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause());
468             }
469         } catch (InterruptedException ie) {
470             Thread.currentThread().interrupt();
471             throw new TransactionException(tx + ": interrupted during commit", ie);
472 
473         }
474 
475     }
476 
477     private void updateShadowCellsAndRemoveCommitTableEntry(final AbstractTransaction<? extends CellId> tx,
478                                                             final PostCommitActions postCommitter) {
479 
480         Futures.transform(postCommitter.updateShadowCells(tx), new Function<Void, Void>() {
481             @Override
482             public Void apply(Void aVoid) {
483                 postCommitter.removeCommitTableEntry(tx);
484                 return null;
485             }
486         });
487 
488     }
489 
490     private void certifyCommitForTx(AbstractTransaction<? extends CellId> txToSetup, long commitTS) {
491 
492         txToSetup.setStatus(Status.COMMITTED);
493         txToSetup.setCommitTimestamp(commitTS);
494 
495     }
496 
497 }