1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21
22 import org.apache.omid.committable.CommitTable;
23 import org.apache.omid.tso.client.CellId;
24
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30
31
32
33
34
35
36
37
38
39
40 public abstract class AbstractTransaction<T extends CellId> implements Transaction {
41
42 public enum VisibilityLevel {
43
44
45 SNAPSHOT,
46
47 SNAPSHOT_ALL,
48
49
50 SNAPSHOT_EXCLUDE_CURRENT;
51
52 public static VisibilityLevel fromInteger(int number) {
53 VisibilityLevel visibilityLevel = SNAPSHOT;
54
55 switch (number) {
56 case 0:
57 visibilityLevel = VisibilityLevel.SNAPSHOT;
58 break;
59 case 1:
60 visibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
61 break;
62 case 2:
63 visibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
64 break;
65 default:
66 assert(false);
67 }
68
69 return visibilityLevel;
70 }
71 }
72
73 private transient Map<String, Object> metadata = new HashMap<>();
74 private final AbstractTransactionManager transactionManager;
75 private final long startTimestamp;
76 protected long readTimestamp;
77 protected long writeTimestamp;
78 private final long epoch;
79 private long commitTimestamp;
80 private boolean isRollbackOnly;
81 private final Set<T> writeSet;
82 private final Set<T> conflictFreeWriteSet;
83 private Status status = Status.RUNNING;
84 private VisibilityLevel visibilityLevel;
85 private final boolean isLowLatency;
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public AbstractTransaction(long transactionId,
107 long epoch,
108 Set<T> writeSet,
109 Set<T> conflictFreeWriteSet,
110 AbstractTransactionManager transactionManager,
111 boolean isLowLatency) {
112 this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet,
113 transactionManager, isLowLatency);
114 }
115
116 public AbstractTransaction(long transactionId,
117 long readTimestamp,
118 VisibilityLevel visibilityLevel,
119 long epoch,
120 Set<T> writeSet,
121 Set<T> conflictFreeWriteSet,
122 AbstractTransactionManager transactionManager,
123 boolean isLowLatency) {
124
125 this.startTimestamp = this.writeTimestamp = transactionId;
126 this.readTimestamp = readTimestamp;
127 this.epoch = epoch;
128 this.writeSet = writeSet;
129 this.conflictFreeWriteSet = conflictFreeWriteSet;
130 this.transactionManager = transactionManager;
131 this.visibilityLevel = visibilityLevel;
132 this.isLowLatency = isLowLatency;
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public AbstractTransaction(long transactionId,
157 long epoch,
158 Set<T> writeSet,
159 Set<T> conflictFreeWriteSet,
160 AbstractTransactionManager transactionManager,
161 long readTimestamp,
162 long writeTimestamp,
163 boolean isLowLatency) {
164 this.startTimestamp = transactionId;
165 this.readTimestamp = readTimestamp;
166 this.writeTimestamp = writeTimestamp;
167 this.epoch = epoch;
168 this.writeSet = writeSet;
169 this.conflictFreeWriteSet = conflictFreeWriteSet;
170 this.transactionManager = transactionManager;
171 this.visibilityLevel = VisibilityLevel.SNAPSHOT;
172 this.isLowLatency = isLowLatency;
173 }
174
175
176
177
178
179
180 public void checkpoint() throws TransactionException {
181
182 setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
183 this.readTimestamp = this.writeTimestamp++;
184
185 if (this.writeTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN == 0) {
186 throw new TransactionException("Error: number of checkpoing cannot exceed " + (CommitTable.MAX_CHECKPOINTS_PER_TXN - 1));
187 }
188 }
189
190
191
192
193 public abstract void cleanup();
194
195
196
197
198 @Override
199 public long getTransactionId() {
200 return startTimestamp;
201 }
202
203
204
205
206 @Override
207 public long getEpoch() {
208 return epoch;
209 }
210
211
212
213
214 @Override
215 public Status getStatus() {
216 return status;
217 }
218
219
220
221
222 @Override
223 public void setRollbackOnly() {
224 isRollbackOnly = true;
225 }
226
227
228
229
230 @Override
231 public boolean isRollbackOnly() {
232 return isRollbackOnly;
233 }
234
235
236
237
238
239 public AbstractTransactionManager getTransactionManager() {
240 return transactionManager;
241 }
242
243
244
245
246
247 public long getStartTimestamp() {
248 return startTimestamp;
249 }
250
251
252
253
254
255 @Override
256 public long getReadTimestamp() {
257 return readTimestamp;
258 }
259
260
261
262
263
264 @Override
265 public long getWriteTimestamp() {
266 return writeTimestamp;
267 }
268
269
270
271
272
273 public long getCommitTimestamp() {
274 return commitTimestamp;
275 }
276
277
278
279
280
281 public VisibilityLevel getVisibilityLevel() {
282 return visibilityLevel;
283 }
284
285
286
287
288
289
290 public void setCommitTimestamp(long commitTimestamp) {
291 this.commitTimestamp = commitTimestamp;
292 }
293
294
295
296
297
298
299 public void setVisibilityLevel(VisibilityLevel visibilityLevel) {
300 this.visibilityLevel = visibilityLevel;
301
302
303
304 if (this.visibilityLevel == VisibilityLevel.SNAPSHOT ||
305 this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) {
306 this.readTimestamp = this.writeTimestamp;
307 }
308 }
309
310
311
312
313
314
315 public void setStatus(Status status) {
316 this.status = status;
317 }
318
319
320
321
322
323 public Set<T> getWriteSet() {
324 return writeSet;
325 }
326
327
328
329
330
331 public Set<T> getConflictFreeWriteSet() {
332 return conflictFreeWriteSet;
333 }
334
335
336
337
338
339
340 public void addWriteSetElement(T element) {
341 writeSet.add(element);
342 }
343
344
345
346
347
348
349 public void addConflictFreeWriteSetElement(T element) {
350 conflictFreeWriteSet.add(element);
351 }
352
353 @Override
354 public String toString() {
355 return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s ConflictFreeWriteSet %s",
356 Long.toHexString(getTransactionId()),
357 status,
358 startTimestamp,
359 readTimestamp,
360 writeTimestamp,
361 commitTimestamp,
362 epoch,
363 writeSet,
364 conflictFreeWriteSet);
365 }
366
367 @Override
368 public Optional<Object> getMetadata(String key) {
369 return Optional.fromNullable(metadata.get(key));
370 }
371
372
373
374
375
376 @Override
377 @SuppressWarnings("unchecked")
378 public void appendMetadata(String key, Object value) {
379 List existingValue = (List) metadata.get(key);
380 if (existingValue == null) {
381 List<Object> newList = new ArrayList<>();
382 newList.add(value);
383 metadata.put(key, newList);
384 } else {
385 existingValue.add(value);
386 }
387 }
388
389 @Override
390 public void setMetadata(String key, Object value) {
391 metadata.put(key, value);
392 }
393
394 @Override
395 public boolean isLowLatency() {
396 return isLowLatency;
397 }
398 }