View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Optional;
21  
22  import org.apache.omid.committable.CommitTable;
23  import org.apache.omid.tso.client.CellId;
24  
25  import java.util.ArrayList;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  
31  /**
32   * Omid's base abstract implementation of the {@link Transaction} interface.
33   * Provides extra methods to access other basic transaction state required by
34   * {@link TransactionManager} implementations based on snapshot
35   * isolation.
36   *
37   * So, this abstract class must be extended by particular implementations of
38   * transaction managers related to different storage systems (HBase...)
39   */
40  public abstract class AbstractTransaction<T extends CellId> implements Transaction {
41  
42      public enum VisibilityLevel {
43          // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction
44          // Sets the readTimestamp to be the writeTimestamp
45          SNAPSHOT,
46          // Returns all the written version of a key X that written by the transaction and the key X from the provided snapshot.
47          SNAPSHOT_ALL,
48          // Returns the last key, either from the snapshot or from the current transaction that was written before the last checkpoint.
49          // Sets the readTimestamp to be the writeTimestamp - 1
50          SNAPSHOT_EXCLUDE_CURRENT;
51  
52          public static VisibilityLevel fromInteger(int number) {
53              VisibilityLevel visibilityLevel = SNAPSHOT;
54  
55              switch (number) {
56              case 0:
57                  visibilityLevel = VisibilityLevel.SNAPSHOT;
58                  break;
59              case 1:
60                  visibilityLevel =  VisibilityLevel.SNAPSHOT_ALL;
61                  break;
62              case 2:
63                  visibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
64                  break;
65                  default:
66                      assert(false);
67              }
68  
69              return visibilityLevel;
70          }
71      }
72  
73      private transient Map<String, Object> metadata = new HashMap<>();
74      private final AbstractTransactionManager transactionManager;
75      private final long startTimestamp;
76      protected long readTimestamp;
77      protected long writeTimestamp;
78      private final long epoch;
79      private long commitTimestamp;
80      private boolean isRollbackOnly;
81      private final Set<T> writeSet;
82      private final Set<T> conflictFreeWriteSet;
83      private Status status = Status.RUNNING;
84      private VisibilityLevel visibilityLevel;
85      private final boolean isLowLatency;
86  
87      /**
88       * Base constructor
89       *
90       * @param transactionId
91       *            transaction identifier to assign
92       * @param epoch
93       *            epoch of the TSOServer instance that created this transaction
94       *            Used in High Availability to guarantee data consistency
95       * @param writeSet
96       *            initial write set for the transaction.
97       *            Should be empty in most cases.
98       * @param conflictFreeWriteSet
99       *            initial conflict free write set for the transaction.
100      *            Should be empty in most cases.
101      * @param transactionManager
102      *            transaction manager associated to this transaction.
103      *            Usually, should be the one that created the transaction
104      *            instance.
105      */
106     public AbstractTransaction(long transactionId,
107                                long epoch,
108                                Set<T> writeSet,
109                                Set<T> conflictFreeWriteSet,
110                                AbstractTransactionManager transactionManager,
111                                boolean isLowLatency) {
112         this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet,
113                 transactionManager, isLowLatency);
114     }
115 
116     public AbstractTransaction(long transactionId,
117             long readTimestamp,
118             VisibilityLevel visibilityLevel,
119             long epoch,
120             Set<T> writeSet,
121             Set<T> conflictFreeWriteSet,
122             AbstractTransactionManager transactionManager,
123             boolean isLowLatency) {
124 
125         this.startTimestamp = this.writeTimestamp = transactionId;
126         this.readTimestamp = readTimestamp;
127         this.epoch = epoch;
128         this.writeSet = writeSet;
129         this.conflictFreeWriteSet = conflictFreeWriteSet;
130         this.transactionManager = transactionManager;
131         this.visibilityLevel = visibilityLevel;
132         this.isLowLatency = isLowLatency;
133     }
134 
135     /**
136      * Base constructor
137      *
138      * @param transactionId
139      *            transaction identifier to assign
140      * @param epoch
141      *            epoch of the TSOServer instance that created this transaction
142      *            Used in High Availability to guarantee data consistency
143      * @param writeSet
144      *            initial write set for the transaction.
145      *            Should be empty in most cases.
146      * @param transactionManager
147      *            transaction manager associated to this transaction.
148      *            Usually, should be the one that created the transaction
149      *            instance.
150      * @param readTimestamp
151      *            the snapshot to read from
152      * @param writeTimestamp
153      *            the timestamp to write to
154      *
155      */
156     public AbstractTransaction(long transactionId,
157                                long epoch,
158                                Set<T> writeSet,
159                                Set<T> conflictFreeWriteSet,
160                                AbstractTransactionManager transactionManager,
161                                long readTimestamp,
162                                long writeTimestamp,
163                                boolean isLowLatency) {
164         this.startTimestamp = transactionId;
165         this.readTimestamp = readTimestamp;
166         this.writeTimestamp = writeTimestamp;
167         this.epoch = epoch;
168         this.writeSet = writeSet;
169         this.conflictFreeWriteSet = conflictFreeWriteSet;
170         this.transactionManager = transactionManager;
171         this.visibilityLevel = VisibilityLevel.SNAPSHOT;
172         this.isLowLatency = isLowLatency;
173     }
174 
175     /**
176      * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT
177      * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation
178      * @return true if a checkpoint was created and false otherwise
179      * @throws TransactionException
180      */
181     public void checkpoint() throws TransactionException {
182 
183         setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
184         this.readTimestamp = this.writeTimestamp++;
185 
186         if (this.writeTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN == 0) {
187             throw new TransactionException("Error: number of checkpoing cannot exceed " + (CommitTable.MAX_CHECKPOINTS_PER_TXN - 1));
188         }
189     }
190 
191     /**
192      * Allows to define specific clean-up task for transaction implementations
193      */
194     public abstract void cleanup();
195 
196     /**
197      * @see org.apache.omid.transaction.Transaction#getTransactionId()
198      */
199     @Override
200     public long getTransactionId() {
201         return startTimestamp;
202     }
203 
204     /**
205      * @see org.apache.omid.transaction.Transaction#getEpoch()
206      */
207     @Override
208     public long getEpoch() {
209         return epoch;
210     }
211 
212     /**
213      * @see org.apache.omid.transaction.Transaction#getStatus()
214      */
215     @Override
216     public Status getStatus() {
217         return status;
218     }
219 
220     /**
221      * @see Transaction#isRollbackOnly()
222      */
223     @Override
224     public void setRollbackOnly() {
225         isRollbackOnly = true;
226     }
227 
228     /**
229      * @see org.apache.omid.transaction.Transaction#isRollbackOnly()
230      */
231     @Override
232     public boolean isRollbackOnly() {
233         return isRollbackOnly;
234     }
235 
236     /**
237      * Returns transaction manager associated to this transaction.
238      * @return transaction manager
239      */
240     public AbstractTransactionManager getTransactionManager() {
241         return transactionManager;
242     }
243 
244     /**
245      * Returns the start timestamp for this transaction.
246      * @return start timestamp
247      */
248     public long getStartTimestamp() {
249         return startTimestamp;
250     }
251 
252     /**
253      * Returns the read timestamp for this transaction.
254      * @return read timestamp
255      */
256     @Override
257     public long getReadTimestamp() {
258         return readTimestamp;
259     }
260 
261     /**
262      * Returns the write timestamp for this transaction.
263      * @return write timestamp
264      */
265     @Override
266     public long getWriteTimestamp() {
267         return writeTimestamp;
268     }
269 
270     /**
271      * Returns the commit timestamp for this transaction.
272      * @return commit timestamp
273      */
274     public long getCommitTimestamp() {
275         return commitTimestamp;
276     }
277 
278     /**
279      * Returns the visibility level for this transaction.
280      * @return visibility level
281      */
282     public VisibilityLevel getVisibilityLevel() {
283         return visibilityLevel;
284     }
285 
286     /**
287      * Sets the commit timestamp for this transaction.
288      * @param commitTimestamp
289      *            the commit timestamp to set
290      */
291     public void setCommitTimestamp(long commitTimestamp) {
292         this.commitTimestamp = commitTimestamp;
293     }
294 
295     /**
296      * Sets the visibility level for this transaction.
297      * @param visibilityLevel
298      *            the {@link VisibilityLevel} to set
299      */
300     public void setVisibilityLevel(VisibilityLevel visibilityLevel) {
301         this.visibilityLevel = visibilityLevel;
302 
303         // If we are setting visibility level to either SNAPSHOT or SNAPSHOT_ALL
304         // then we should let readTimestamp equals to writeTimestamp
305         if (this.visibilityLevel == VisibilityLevel.SNAPSHOT ||
306             this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) {
307             this.readTimestamp = this.writeTimestamp;
308         }
309     }
310 
311     /**
312      * Sets the status for this transaction.
313      * @param status
314      *            the {@link Status} to set
315      */
316     public void setStatus(Status status) {
317         this.status = status;
318     }
319 
320     /**
321      * Returns the current write-set for this transaction.
322      * @return write set
323      */
324     public Set<T> getWriteSet() {
325         return writeSet;
326     }
327 
328     /**
329      * Returns the current write-set for this transaction that its elements are not candidates for conflict analysis.
330      * @return conflictFreeWriteSet
331      */
332     public Set<T> getConflictFreeWriteSet() {
333         return conflictFreeWriteSet;
334     }
335 
336     /**
337      * Adds an element to the transaction write-set.
338      * @param element
339      *            the element to add
340      */
341     public void addWriteSetElement(T element) {
342         writeSet.add(element);
343     }
344 
345     /**
346      * Adds an element to the transaction conflict free write-set.
347      * @param element
348      *            the element to add
349      */
350     public void addConflictFreeWriteSetElement(T element) {
351         conflictFreeWriteSet.add(element);
352     }
353 
354     @Override
355     public String toString() {
356         return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s ConflictFreeWriteSet %s",
357                              Long.toHexString(getTransactionId()),
358                              status,
359                              startTimestamp,
360                              readTimestamp,
361                              writeTimestamp,
362                              commitTimestamp,
363                              epoch,
364                              writeSet,
365                              conflictFreeWriteSet);
366     }
367 
368     @Override
369     public Optional<Object> getMetadata(String key) {
370         return Optional.fromNullable(metadata.get(key));
371     }
372 
373     /**
374      * Expects they metadata stored under key "key" to be of the "Set" type,
375      * append "value" to the existing set or creates a new one
376      */
377     @Override
378     @SuppressWarnings("unchecked")
379     public void appendMetadata(String key, Object value) {
380         List existingValue = (List) metadata.get(key);
381         if (existingValue == null) {
382             List<Object> newList = new ArrayList<>();
383             newList.add(value);
384             metadata.put(key, newList);
385         } else {
386             existingValue.add(value);
387         }
388     }
389 
390     @Override
391     public void setMetadata(String key, Object value) {
392         metadata.put(key, value);
393     }
394 
395     @Override
396     public boolean isLowLatency() {
397         return isLowLatency;
398     }
399 }