View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  
22  import org.apache.omid.committable.CommitTable;
23  import org.apache.omid.tso.client.CellId;
24  
25  import java.util.ArrayList;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  
31  /**
32   * Omid's base abstract implementation of the {@link Transaction} interface.
33   * Provides extra methods to access other basic transaction state required by
34   * {@link TransactionManager} implementations based on snapshot
35   * isolation.
36   *
37   * So, this abstract class must be extended by particular implementations of
38   * transaction managers related to different storage systems (HBase...)
39   */
40  public abstract class AbstractTransaction<T extends CellId> implements Transaction {
41  
42      public enum VisibilityLevel {
43          // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction
44          // Sets the readTimestamp to be the writeTimestamp
45          SNAPSHOT,
46          // Returns all the written version of a key X that written by the transaction and the key X from the provided snapshot.
47          SNAPSHOT_ALL,
48          // Returns the last key, either from the snapshot or from the current transaction that was written before the last checkpoint.
49          // Sets the readTimestamp to be the writeTimestamp - 1
50          SNAPSHOT_EXCLUDE_CURRENT;
51  
52          public static VisibilityLevel fromInteger(int number) {
53              VisibilityLevel visibilityLevel = SNAPSHOT;
54  
55              switch (number) {
56              case 0:
57                  visibilityLevel = VisibilityLevel.SNAPSHOT;
58                  break;
59              case 1:
60                  visibilityLevel =  VisibilityLevel.SNAPSHOT_ALL;
61                  break;
62              case 2:
63                  visibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
64                  break;
65                  default:
66                      assert(false);
67              }
68  
69              return visibilityLevel;
70          }
71      }
72  
73      private transient Map<String, Object> metadata = new HashMap<>();
74      private final AbstractTransactionManager transactionManager;
75      private final long startTimestamp;
76      protected long readTimestamp;
77      protected long writeTimestamp;
78      private final long epoch;
79      private long commitTimestamp;
80      private boolean isRollbackOnly;
81      private final Set<T> writeSet;
82      private final Set<T> conflictFreeWriteSet;
83      private Status status = Status.RUNNING;
84      private VisibilityLevel visibilityLevel;
85      private final boolean isLowLatency;
86  
87      /**
88       * Base constructor
89       *
90       * @param transactionId
91       *            transaction identifier to assign
92       * @param epoch
93       *            epoch of the TSOServer instance that created this transaction
94       *            Used in High Availability to guarantee data consistency
95       * @param writeSet
96       *            initial write set for the transaction.
97       *            Should be empty in most cases.
98       * @param conflictFreeWriteSet
99       *            initial conflict free write set for the transaction.
100      *            Should be empty in most cases.
101      * @param transactionManager
102      *            transaction manager associated to this transaction.
103      *            Usually, should be the one that created the transaction
104      *            instance.
105      */
106     public AbstractTransaction(long transactionId,
107                                long epoch,
108                                Set<T> writeSet,
109                                Set<T> conflictFreeWriteSet,
110                                AbstractTransactionManager transactionManager,
111                                boolean isLowLatency) {
112         this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet,
113                 transactionManager, isLowLatency);
114     }
115 
116     public AbstractTransaction(long transactionId,
117             long readTimestamp,
118             VisibilityLevel visibilityLevel,
119             long epoch,
120             Set<T> writeSet,
121             Set<T> conflictFreeWriteSet,
122             AbstractTransactionManager transactionManager,
123             boolean isLowLatency) {
124 
125         this.startTimestamp = this.writeTimestamp = transactionId;
126         this.readTimestamp = readTimestamp;
127         this.epoch = epoch;
128         this.writeSet = writeSet;
129         this.conflictFreeWriteSet = conflictFreeWriteSet;
130         this.transactionManager = transactionManager;
131         this.visibilityLevel = visibilityLevel;
132         this.isLowLatency = isLowLatency;
133     }
134 
135     /**
136      * Base constructor
137      *
138      * @param transactionId
139      *            transaction identifier to assign
140      * @param epoch
141      *            epoch of the TSOServer instance that created this transaction
142      *            Used in High Availability to guarantee data consistency
143      * @param writeSet
144      *            initial write set for the transaction.
145      *            Should be empty in most cases.
146      * @param transactionManager
147      *            transaction manager associated to this transaction.
148      *            Usually, should be the one that created the transaction
149      *            instance.
150      * @param readTimestamp
151      *            the snapshot to read from
152      * @param writeTimestamp
153      *            the timestamp to write to
154      *
155      */
156     public AbstractTransaction(long transactionId,
157                                long epoch,
158                                Set<T> writeSet,
159                                Set<T> conflictFreeWriteSet,
160                                AbstractTransactionManager transactionManager,
161                                long readTimestamp,
162                                long writeTimestamp,
163                                boolean isLowLatency) {
164         this.startTimestamp = transactionId;
165         this.readTimestamp = readTimestamp;
166         this.writeTimestamp = writeTimestamp;
167         this.epoch = epoch;
168         this.writeSet = writeSet;
169         this.conflictFreeWriteSet = conflictFreeWriteSet;
170         this.transactionManager = transactionManager;
171         this.visibilityLevel = VisibilityLevel.SNAPSHOT;
172         this.isLowLatency = isLowLatency;
173     }
174 
175     /**
176      * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT
177      * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation
178      * @throws TransactionException
179      */
180     public void checkpoint() throws TransactionException {
181 
182         setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
183         this.readTimestamp = this.writeTimestamp++;
184 
185         if (this.writeTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN == 0) {
186             throw new TransactionException("Error: number of checkpoing cannot exceed " + (CommitTable.MAX_CHECKPOINTS_PER_TXN - 1));
187         }
188     }
189 
190     /**
191      * Allows to define specific clean-up task for transaction implementations
192      */
193     public abstract void cleanup();
194 
195     /**
196      * @see org.apache.omid.transaction.Transaction#getTransactionId()
197      */
198     @Override
199     public long getTransactionId() {
200         return startTimestamp;
201     }
202 
203     /**
204      * @see org.apache.omid.transaction.Transaction#getEpoch()
205      */
206     @Override
207     public long getEpoch() {
208         return epoch;
209     }
210 
211     /**
212      * @see org.apache.omid.transaction.Transaction#getStatus()
213      */
214     @Override
215     public Status getStatus() {
216         return status;
217     }
218 
219     /**
220      * @see Transaction#isRollbackOnly()
221      */
222     @Override
223     public void setRollbackOnly() {
224         isRollbackOnly = true;
225     }
226 
227     /**
228      * @see org.apache.omid.transaction.Transaction#isRollbackOnly()
229      */
230     @Override
231     public boolean isRollbackOnly() {
232         return isRollbackOnly;
233     }
234 
235     /**
236      * Returns transaction manager associated to this transaction.
237      * @return transaction manager
238      */
239     public AbstractTransactionManager getTransactionManager() {
240         return transactionManager;
241     }
242 
243     /**
244      * Returns the start timestamp for this transaction.
245      * @return start timestamp
246      */
247     public long getStartTimestamp() {
248         return startTimestamp;
249     }
250 
251     /**
252      * Returns the read timestamp for this transaction.
253      * @return read timestamp
254      */
255     @Override
256     public long getReadTimestamp() {
257         return readTimestamp;
258     }
259 
260     /**
261      * Returns the write timestamp for this transaction.
262      * @return write timestamp
263      */
264     @Override
265     public long getWriteTimestamp() {
266         return writeTimestamp;
267     }
268 
269     /**
270      * Returns the commit timestamp for this transaction.
271      * @return commit timestamp
272      */
273     public long getCommitTimestamp() {
274         return commitTimestamp;
275     }
276 
277     /**
278      * Returns the visibility level for this transaction.
279      * @return visibility level
280      */
281     public VisibilityLevel getVisibilityLevel() {
282         return visibilityLevel;
283     }
284 
285     /**
286      * Sets the commit timestamp for this transaction.
287      * @param commitTimestamp
288      *            the commit timestamp to set
289      */
290     public void setCommitTimestamp(long commitTimestamp) {
291         this.commitTimestamp = commitTimestamp;
292     }
293 
294     /**
295      * Sets the visibility level for this transaction.
296      * @param visibilityLevel
297      *            the {@link VisibilityLevel} to set
298      */
299     public void setVisibilityLevel(VisibilityLevel visibilityLevel) {
300         this.visibilityLevel = visibilityLevel;
301 
302         // If we are setting visibility level to either SNAPSHOT or SNAPSHOT_ALL
303         // then we should let readTimestamp equals to writeTimestamp
304         if (this.visibilityLevel == VisibilityLevel.SNAPSHOT ||
305             this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) {
306             this.readTimestamp = this.writeTimestamp;
307         }
308     }
309 
310     /**
311      * Sets the status for this transaction.
312      * @param status
313      *            the {@link Status} to set
314      */
315     public void setStatus(Status status) {
316         this.status = status;
317     }
318 
319     /**
320      * Returns the current write-set for this transaction.
321      * @return write set
322      */
323     public Set<T> getWriteSet() {
324         return writeSet;
325     }
326 
327     /**
328      * Returns the current write-set for this transaction that its elements are not candidates for conflict analysis.
329      * @return conflictFreeWriteSet
330      */
331     public Set<T> getConflictFreeWriteSet() {
332         return conflictFreeWriteSet;
333     }
334 
335     /**
336      * Adds an element to the transaction write-set.
337      * @param element
338      *            the element to add
339      */
340     public void addWriteSetElement(T element) {
341         writeSet.add(element);
342     }
343 
344     /**
345      * Adds an element to the transaction conflict free write-set.
346      * @param element
347      *            the element to add
348      */
349     public void addConflictFreeWriteSetElement(T element) {
350         conflictFreeWriteSet.add(element);
351     }
352 
353     @Override
354     public String toString() {
355         return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s ConflictFreeWriteSet %s",
356                              Long.toHexString(getTransactionId()),
357                              status,
358                              startTimestamp,
359                              readTimestamp,
360                              writeTimestamp,
361                              commitTimestamp,
362                              epoch,
363                              writeSet,
364                              conflictFreeWriteSet);
365     }
366 
367     @Override
368     public Optional<Object> getMetadata(String key) {
369         return Optional.fromNullable(metadata.get(key));
370     }
371 
372     /**
373      * Expects they metadata stored under key "key" to be of the "Set" type,
374      * append "value" to the existing set or creates a new one
375      */
376     @Override
377     @SuppressWarnings("unchecked")
378     public void appendMetadata(String key, Object value) {
379         List existingValue = (List) metadata.get(key);
380         if (existingValue == null) {
381             List<Object> newList = new ArrayList<>();
382             newList.add(value);
383             metadata.put(key, newList);
384         } else {
385             existingValue.add(value);
386         }
387     }
388 
389     @Override
390     public void setMetadata(String key, Object value) {
391         metadata.put(key, value);
392     }
393 
394     @Override
395     public boolean isLowLatency() {
396         return isLowLatency;
397     }
398 }