View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Optional;
21  
22  import org.apache.omid.tso.client.CellId;
23  
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  
30  /**
31   * Omid's base abstract implementation of the {@link Transaction} interface.
32   * Provides extra methods to access other basic transaction state required by
33   * {@link TransactionManager} implementations based on snapshot
34   * isolation.
35   *
36   * So, this abstract class must be extended by particular implementations of
37   * transaction managers related to different storage systems (HBase...)
38   */
39  public abstract class AbstractTransaction<T extends CellId> implements Transaction {
40  
41      public enum VisibilityLevel {
42          // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction
43          // Sets the readTimestamp to be the writeTimestamp
44          SNAPSHOT,
45          // Returns all the written version of a key X that written by the transaction and the key X from the provided snapshot.
46          SNAPSHOT_ALL,
47          // Returns the last key, either from the snapshot or from the current transaction that was written before the last checkpoint.
48          // Sets the readTimestamp to be the writeTimestamp - 1
49          SNAPSHOT_EXCLUDE_CURRENT;
50  
51          public static VisibilityLevel fromInteger(int number) {
52              VisibilityLevel visibilityLevel = SNAPSHOT;
53  
54              switch (number) {
55              case 0:
56                  visibilityLevel = VisibilityLevel.SNAPSHOT;
57                  break;
58              case 1:
59                  visibilityLevel =  VisibilityLevel.SNAPSHOT_ALL;
60                  break;
61              case 2:
62                  visibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
63                  break;
64                  default:
65                      assert(false);
66              }
67  
68              return visibilityLevel;
69          }
70      }
71  
72      private transient Map<String, Object> metadata = new HashMap<>();
73      private final AbstractTransactionManager transactionManager;
74      private final long startTimestamp;
75      protected long readTimestamp;
76      protected long writeTimestamp;
77      private final long epoch;
78      private long commitTimestamp;
79      private boolean isRollbackOnly;
80      private final Set<T> writeSet;
81      private final Set<T> conflictFreeWriteSet;
82      private Status status = Status.RUNNING;
83      private VisibilityLevel visibilityLevel;
84      private final boolean isLowLatency;
85  
86      /**
87       * Base constructor
88       *
89       * @param transactionId
90       *            transaction identifier to assign
91       * @param epoch
92       *            epoch of the TSOServer instance that created this transaction
93       *            Used in High Availability to guarantee data consistency
94       * @param writeSet
95       *            initial write set for the transaction.
96       *            Should be empty in most cases.
97       * @param conflictFreeWriteSet
98       *            initial conflict free write set for the transaction.
99       *            Should be empty in most cases.
100      * @param transactionManager
101      *            transaction manager associated to this transaction.
102      *            Usually, should be the one that created the transaction
103      *            instance.
104      */
105     public AbstractTransaction(long transactionId,
106                                long epoch,
107                                Set<T> writeSet,
108                                Set<T> conflictFreeWriteSet,
109                                AbstractTransactionManager transactionManager,
110                                boolean isLowLatency) {
111         this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet,
112                 transactionManager, isLowLatency);
113     }
114 
115     public AbstractTransaction(long transactionId,
116             long readTimestamp,
117             VisibilityLevel visibilityLevel,
118             long epoch,
119             Set<T> writeSet,
120             Set<T> conflictFreeWriteSet,
121             AbstractTransactionManager transactionManager,
122             boolean isLowLatency) {
123 
124         this.startTimestamp = this.writeTimestamp = transactionId;
125         this.readTimestamp = readTimestamp;
126         this.epoch = epoch;
127         this.writeSet = writeSet;
128         this.conflictFreeWriteSet = conflictFreeWriteSet;
129         this.transactionManager = transactionManager;
130         this.visibilityLevel = visibilityLevel;
131         this.isLowLatency = isLowLatency;
132     }
133 
134     /**
135      * Base constructor
136      *
137      * @param transactionId
138      *            transaction identifier to assign
139      * @param epoch
140      *            epoch of the TSOServer instance that created this transaction
141      *            Used in High Availability to guarantee data consistency
142      * @param writeSet
143      *            initial write set for the transaction.
144      *            Should be empty in most cases.
145      * @param transactionManager
146      *            transaction manager associated to this transaction.
147      *            Usually, should be the one that created the transaction
148      *            instance.
149      * @param readTimestamp
150      *            the snapshot to read from
151      * @param writeTimestamp
152      *            the timestamp to write to
153      *
154      */
155     public AbstractTransaction(long transactionId,
156                                long epoch,
157                                Set<T> writeSet,
158                                Set<T> conflictFreeWriteSet,
159                                AbstractTransactionManager transactionManager,
160                                long readTimestamp,
161                                long writeTimestamp,
162                                boolean isLowLatency) {
163         this.startTimestamp = transactionId;
164         this.readTimestamp = readTimestamp;
165         this.writeTimestamp = writeTimestamp;
166         this.epoch = epoch;
167         this.writeSet = writeSet;
168         this.conflictFreeWriteSet = conflictFreeWriteSet;
169         this.transactionManager = transactionManager;
170         this.visibilityLevel = VisibilityLevel.SNAPSHOT;
171         this.isLowLatency = isLowLatency;
172     }
173 
174     /**
175      * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT
176      * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation
177      * @return true if a checkpoint was created and false otherwise
178      * @throws TransactionException
179      */
180     public void checkpoint() throws TransactionException {
181 
182         setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
183         this.readTimestamp = this.writeTimestamp++;
184 
185         if (this.writeTimestamp % AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN == 0) {
186             throw new TransactionException("Error: number of checkpoing cannot exceed " + (AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1));
187         }
188     }
189 
190     /**
191      * Allows to define specific clean-up task for transaction implementations
192      */
193     public abstract void cleanup();
194 
195     /**
196      * @see org.apache.omid.transaction.Transaction#getTransactionId()
197      */
198     @Override
199     public long getTransactionId() {
200         return startTimestamp;
201     }
202 
203     /**
204      * @see org.apache.omid.transaction.Transaction#getEpoch()
205      */
206     @Override
207     public long getEpoch() {
208         return epoch;
209     }
210 
211     /**
212      * @see org.apache.omid.transaction.Transaction#getStatus()
213      */
214     @Override
215     public Status getStatus() {
216         return status;
217     }
218 
219     /**
220      * @see Transaction#isRollbackOnly()
221      */
222     @Override
223     public void setRollbackOnly() {
224         isRollbackOnly = true;
225     }
226 
227     /**
228      * @see org.apache.omid.transaction.Transaction#isRollbackOnly()
229      */
230     @Override
231     public boolean isRollbackOnly() {
232         return isRollbackOnly;
233     }
234 
235     /**
236      * Returns transaction manager associated to this transaction.
237      * @return transaction manager
238      */
239     public AbstractTransactionManager getTransactionManager() {
240         return transactionManager;
241     }
242 
243     /**
244      * Returns the start timestamp for this transaction.
245      * @return start timestamp
246      */
247     public long getStartTimestamp() {
248         return startTimestamp;
249     }
250 
251     /**
252      * Returns the read timestamp for this transaction.
253      * @return read timestamp
254      */
255     @Override
256     public long getReadTimestamp() {
257         return readTimestamp;
258     }
259 
260     /**
261      * Returns the write timestamp for this transaction.
262      * @return write timestamp
263      */
264     @Override
265     public long getWriteTimestamp() {
266         return writeTimestamp;
267     }
268 
269     /**
270      * Returns the commit timestamp for this transaction.
271      * @return commit timestamp
272      */
273     public long getCommitTimestamp() {
274         return commitTimestamp;
275     }
276 
277     /**
278      * Returns the visibility level for this transaction.
279      * @return visibility level
280      */
281     public VisibilityLevel getVisibilityLevel() {
282         return visibilityLevel;
283     }
284 
285     /**
286      * Sets the commit timestamp for this transaction.
287      * @param commitTimestamp
288      *            the commit timestamp to set
289      */
290     public void setCommitTimestamp(long commitTimestamp) {
291         this.commitTimestamp = commitTimestamp;
292     }
293 
294     /**
295      * Sets the visibility level for this transaction.
296      * @param visibilityLevel
297      *            the {@link VisibilityLevel} to set
298      */
299     public void setVisibilityLevel(VisibilityLevel visibilityLevel) {
300         this.visibilityLevel = visibilityLevel;
301 
302         // If we are setting visibility level to either SNAPSHOT or SNAPSHOT_ALL
303         // then we should let readTimestamp equals to writeTimestamp
304         if (this.visibilityLevel == VisibilityLevel.SNAPSHOT ||
305             this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) {
306             this.readTimestamp = this.writeTimestamp;
307         }
308     }
309 
310     /**
311      * Sets the status for this transaction.
312      * @param status
313      *            the {@link Status} to set
314      */
315     public void setStatus(Status status) {
316         this.status = status;
317     }
318 
319     /**
320      * Returns the current write-set for this transaction.
321      * @return write set
322      */
323     public Set<T> getWriteSet() {
324         return writeSet;
325     }
326 
327     /**
328      * Returns the current write-set for this transaction that its elements are not candidates for conflict analysis.
329      * @return conflictFreeWriteSet
330      */
331     public Set<T> getConflictFreeWriteSet() {
332         return conflictFreeWriteSet;
333     }
334 
335     /**
336      * Adds an element to the transaction write-set.
337      * @param element
338      *            the element to add
339      */
340     public void addWriteSetElement(T element) {
341         writeSet.add(element);
342     }
343 
344     /**
345      * Adds an element to the transaction conflict free write-set.
346      * @param element
347      *            the element to add
348      */
349     public void addConflictFreeWriteSetElement(T element) {
350         conflictFreeWriteSet.add(element);
351     }
352 
353     @Override
354     public String toString() {
355         return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s ConflictFreeWriteSet %s",
356                              Long.toHexString(getTransactionId()),
357                              status,
358                              startTimestamp,
359                              readTimestamp,
360                              writeTimestamp,
361                              commitTimestamp,
362                              epoch,
363                              writeSet,
364                              conflictFreeWriteSet);
365     }
366 
367     @Override
368     public Optional<Object> getMetadata(String key) {
369         return Optional.fromNullable(metadata.get(key));
370     }
371 
372     /**
373      * Expects they metadata stored under key "key" to be of the "Set" type,
374      * append "value" to the existing set or creates a new one
375      */
376     @Override
377     @SuppressWarnings("unchecked")
378     public void appendMetadata(String key, Object value) {
379         List existingValue = (List) metadata.get(key);
380         if (existingValue == null) {
381             List<Object> newList = new ArrayList<>();
382             newList.add(value);
383             metadata.put(key, newList);
384         } else {
385             existingValue.add(value);
386         }
387     }
388 
389     @Override
390     public void setMetadata(String key, Object value) {
391         metadata.put(key, value);
392     }
393 
394     @Override
395     public boolean isLowLatency() {
396         return isLowLatency;
397     }
398 }