1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Function;
21 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22 import org.apache.phoenix.thirdparty.com.google.common.hash.Hashing;
23 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.Futures;
24 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
25 import org.apache.omid.committable.CommitTable;
26 import org.apache.omid.committable.CommitTable.CommitTimestamp;
27 import org.apache.omid.metrics.Counter;
28 import org.apache.omid.metrics.MetricsRegistry;
29 import org.apache.omid.metrics.Timer;
30 import org.apache.omid.transaction.Transaction.Status;
31 import org.apache.omid.tso.client.AbortException;
32 import org.apache.omid.tso.client.CellId;
33 import org.apache.omid.tso.client.ConnectionException;
34 import org.apache.omid.tso.client.ServiceUnavailableException;
35 import org.apache.omid.tso.client.TSOProtocol;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import java.io.IOException;
40 import java.util.concurrent.ExecutionException;
41
42
43 import static org.apache.omid.metrics.MetricsUtils.name;
44
45
46
47
48
49
50
51
52
53
54 public abstract class AbstractTransactionManager implements TransactionManager {
55
56 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionManager.class);
57
58 public interface TransactionFactory<T extends CellId> {
59
60 AbstractTransaction<T> createTransaction(long transactionId, long epoch, AbstractTransactionManager tm);
61
62 }
63
64 private final PostCommitActions postCommitter;
65 protected final TSOProtocol tsoClient;
66 protected final CommitTable.Client commitTableClient;
67 private final CommitTable.Writer commitTableWriter;
68 private final TransactionFactory<? extends CellId> transactionFactory;
69
70
71 private final Timer startTimestampTimer;
72 private final Timer commitTimer;
73 private final Timer fenceTimer;
74 private final Counter committedTxsCounter;
75 private final Counter rolledbackTxsCounter;
76 private final Counter errorTxsCounter;
77 private final Counter invalidatedTxsCounter;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 public AbstractTransactionManager(MetricsRegistry metrics,
95 PostCommitActions postCommitter,
96 TSOProtocol tsoClient,
97 CommitTable.Client commitTableClient,
98 CommitTable.Writer commitTableWriter,
99 TransactionFactory<? extends CellId> transactionFactory) {
100
101 this.tsoClient = tsoClient;
102 this.postCommitter = postCommitter;
103 this.commitTableClient = commitTableClient;
104 this.commitTableWriter = commitTableWriter;
105 this.transactionFactory = transactionFactory;
106
107
108 this.startTimestampTimer = metrics.timer(name("omid", "tm", "hbase", "startTimestamp", "latency"));
109 this.commitTimer = metrics.timer(name("omid", "tm", "hbase", "commit", "latency"));
110 this.fenceTimer = metrics.timer(name("omid", "tm", "hbase", "fence", "latency"));
111 this.committedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "committedTxs"));
112 this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
113 this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
114 this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs"));
115
116 }
117
118
119
120
121
122 public void preBegin() throws TransactionManagerException {}
123
124
125
126
127 @Override
128 public final Transaction begin() throws TransactionException {
129
130 try {
131 preBegin();
132
133 long startTimestamp, epoch;
134
135
136
137
138
139
140
141 startTimestampTimer.start();
142 try {
143 do {
144 startTimestamp = tsoClient.getNewStartTimestamp().get();
145 epoch = tsoClient.getEpoch();
146 } while (epoch > startTimestamp);
147 } finally {
148 startTimestampTimer.stop();
149 }
150
151 AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(startTimestamp, epoch, this);
152
153 postBegin(tx);
154
155 return tx;
156 } catch (TransactionManagerException e) {
157 throw new TransactionException("An error has occured during PreBegin/PostBegin", e);
158 } catch (ExecutionException e) {
159 throw new TransactionException("Could not get new timestamp", e);
160 } catch (InterruptedException ie) {
161 Thread.currentThread().interrupt();
162 throw new TransactionException("Interrupted getting timestamp", ie);
163 }
164 }
165
166
167
168
169
170
171 abstract public long getHashForTable(byte[] tableName);
172
173
174
175
176
177 public CommitTable.Client getCommitTableClient() {
178 return commitTableClient;
179 }
180
181
182
183
184 @Override
185 public final Transaction fence(byte[] tableName) throws TransactionException {
186 long fenceTimestamp;
187 long tableID = getHashForTable(tableName); Hashing.murmur3_128().newHasher().putBytes(tableName).hash().asLong();
188
189 try {
190 fenceTimer.start();
191 try {
192 fenceTimestamp = tsoClient.getFence(tableID).get();
193 } finally {
194 fenceTimer.stop();
195 }
196
197 AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(fenceTimestamp, fenceTimestamp, this);
198
199 return tx;
200 } catch (ExecutionException e) {
201 throw new TransactionException("Could not get fence", e);
202 } catch (InterruptedException ie) {
203 Thread.currentThread().interrupt();
204 throw new TransactionException("Interrupted creating a fence", ie);
205 }
206 }
207
208
209
210
211
212
213
214 public void postBegin(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
215
216
217
218
219
220
221
222 public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
223
224
225
226
227 @Override
228 public final void commit(Transaction transaction) throws RollbackException, TransactionException {
229
230 AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
231 enforceTransactionIsInRunningState(tx);
232
233 if (tx.isRollbackOnly()) {
234 rollback(tx);
235 throw new RollbackException(tx + ": Tx was set to rollback explicitly");
236 }
237
238 try {
239
240 preCommit(tx);
241
242 commitTimer.start();
243 try {
244 if (tx.getWriteSet().isEmpty() && tx.getConflictFreeWriteSet().isEmpty()) {
245 markReadOnlyTransaction(tx);
246 } else {
247 if (tsoClient.isLowLatency())
248 commitLowLatencyTransaction(tx);
249 else
250 commitRegularTransaction(tx);
251 }
252 committedTxsCounter.inc();
253 } finally {
254 commitTimer.stop();
255 }
256
257 postCommit(tx);
258
259 } catch (TransactionManagerException e) {
260 throw new TransactionException(e.getMessage(), e);
261 }
262
263 }
264
265
266
267
268
269
270
271 public void postCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
272
273
274
275
276
277
278 public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
279
280
281
282
283 @Override
284 public final void rollback(Transaction transaction) throws TransactionException {
285
286 AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
287 enforceTransactionIsInRunningState(tx);
288
289 try {
290
291 preRollback(tx);
292
293
294 tx.setCommitTimestamp(0);
295 tx.setStatus(Status.ROLLEDBACK);
296
297 postRollback(tx);
298
299 } catch (TransactionManagerException e) {
300 throw new TransactionException(e.getMessage(), e);
301 } finally {
302 tx.cleanup();
303 }
304
305 }
306
307
308
309
310
311
312
313 public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
314
315
316 protected abstract void closeResources() throws IOException;
317
318
319
320
321 @Override
322 public final void close() throws IOException {
323 tsoClient.close();
324 closeResources();
325 }
326
327
328
329
330
331 private void enforceTransactionIsInRunningState(Transaction transaction) {
332
333 if (transaction.getStatus() != Status.RUNNING) {
334 throw new IllegalArgumentException("Transaction was already " + transaction.getStatus());
335 }
336
337 }
338
339 @SuppressWarnings("unchecked")
340
341 private AbstractTransaction<? extends CellId> enforceAbstractTransactionAsParam(Transaction tx) {
342
343 if (tx instanceof AbstractTransaction) {
344 return (AbstractTransaction<? extends CellId>) tx;
345 } else {
346 throw new IllegalArgumentException(
347 "The transaction object passed is not an instance of AbstractTransaction");
348 }
349
350 }
351
352 private void markReadOnlyTransaction(AbstractTransaction<? extends CellId> readOnlyTx) {
353
354 readOnlyTx.setStatus(Status.COMMITTED_RO);
355
356 }
357
358 private void commitLowLatencyTransaction(AbstractTransaction<? extends CellId> tx)
359 throws RollbackException, TransactionException {
360 try {
361
362 long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
363 boolean committed = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs);
364 if (!committed) {
365
366 rollback(tx);
367 commitTableClient.deleteCommitEntry(tx.getStartTimestamp());
368 rolledbackTxsCounter.inc();
369 throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
370 }
371 certifyCommitForTx(tx, commitTs);
372 updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
373
374 } catch (ExecutionException e) {
375 if (e.getCause() instanceof AbortException) {
376 rollback(tx);
377 rolledbackTxsCounter.inc();
378 throw new RollbackException(tx.getStartTimestamp() + ": Conflicts detected in writeset", e.getCause());
379 }
380
381 if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
382 errorTxsCounter.inc();
383 rollback(tx);
384 throw new RollbackException(tx.getStartTimestamp() + " rolled-back precautionary", e.getCause());
385 } else {
386 throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome", e.getCause());
387 }
388 } catch (InterruptedException e) {
389 e.printStackTrace();
390 } catch (IOException e) {
391 e.printStackTrace();
392 }
393 }
394
395 private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
396 throws RollbackException, TransactionException
397 {
398
399 try {
400
401 long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
402 certifyCommitForTx(tx, commitTs);
403 updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
404
405 } catch (ExecutionException e) {
406
407 if (e.getCause() instanceof AbortException) {
408 rollback(tx);
409 rolledbackTxsCounter.inc();
410 throw new RollbackException(tx.getStartTimestamp() + ": Conflicts detected in writeset", e.getCause());
411 }
412
413 if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
414
415 errorTxsCounter.inc();
416 try {
417 LOG.warn("Can't contact the TSO for receiving outcome for Tx {}. Checking Commit Table...", tx.getStartTimestamp());
418
419
420 Optional<CommitTimestamp> commitTimestamp =
421 commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
422 if (commitTimestamp.isPresent()) {
423 if (commitTimestamp.get().isValid()) {
424 LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx.getStartTimestamp());
425 certifyCommitForTx(tx, commitTimestamp.get().getValue());
426 postCommitter.updateShadowCells(tx);
427 } else {
428 LOG.warn("{}: Invalidated commit TS found in Commit Table. Rolling-back...", tx.getStartTimestamp());
429 rollback(tx);
430 throw new RollbackException(tx.getStartTimestamp() + " invalidated by other Tx started", e.getCause());
431 }
432 } else {
433 LOG.warn("{}: Trying to invalidate Tx proactively in Commit Table...", tx.getStartTimestamp());
434 boolean invalidated = commitTableClient.tryInvalidateTransaction(tx.getStartTimestamp()).get();
435 if (invalidated) {
436 LOG.warn("{}: Invalidated proactively in Commit Table. Rolling-back Tx...", tx.getStartTimestamp());
437 invalidatedTxsCounter.inc();
438 rollback(tx);
439 throw new RollbackException(tx.getStartTimestamp() + " rolled-back precautionary", e.getCause());
440 } else {
441 LOG.warn("{}: Invalidation could NOT be completed. Re-checking Commit Table...", tx.getStartTimestamp());
442
443 commitTimestamp = commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
444 if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
445 LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx.getStartTimestamp());
446 certifyCommitForTx(tx, commitTimestamp.get().getValue());
447 postCommitter.updateShadowCells(tx);
448 } else {
449 LOG.error("{}: Can't determine Transaction outcome", tx.getStartTimestamp());
450 throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome");
451 }
452 }
453 }
454 } catch (ExecutionException e1) {
455 throw new TransactionException(tx.getStartTimestamp() + ": problem reading commitTS from Commit Table", e1);
456 } catch (InterruptedException e1) {
457 Thread.currentThread().interrupt();
458 throw new TransactionException(tx.getStartTimestamp() + ": interrupted while reading commitTS from Commit Table", e1);
459 }
460 } else {
461 throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome", e.getCause());
462 }
463 } catch (InterruptedException ie) {
464 Thread.currentThread().interrupt();
465 throw new TransactionException(tx.getStartTimestamp() + ": interrupted during commit", ie);
466
467 }
468
469 }
470
471 private void updateShadowCellsAndRemoveCommitTableEntry(final AbstractTransaction<? extends CellId> tx,
472 final PostCommitActions postCommitter) {
473
474 Futures.transform(postCommitter.updateShadowCells(tx), new Function<Void, Void>() {
475 @Override
476 public Void apply(Void aVoid) {
477 postCommitter.removeCommitTableEntry(tx);
478 return null;
479 }
480 }, MoreExecutors.directExecutor());
481
482 }
483
484 private void certifyCommitForTx(AbstractTransaction<? extends CellId> txToSetup, long commitTS) {
485
486 txToSetup.setStatus(Status.COMMITTED);
487 txToSetup.setCommitTimestamp(commitTS);
488
489 }
490
491 public boolean isLowLatency() {
492 return tsoClient.isLowLatency();
493 }
494 }