1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22
23 import org.apache.omid.committable.CommitTable;
24 import org.apache.omid.metrics.Gauge;
25 import org.apache.omid.metrics.MetricsRegistry;
26 import org.apache.omid.timestamp.storage.TimestampStorage;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32
33 import java.io.IOException;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36
37 import static org.apache.omid.metrics.MetricsUtils.name;
38
39
40
41
42 @Singleton
43 public class TimestampOracleImpl implements TimestampOracle {
44
45 private static final Logger LOG = LoggerFactory.getLogger(TimestampOracleImpl.class);
46
47 @VisibleForTesting
48 static class InMemoryTimestampStorage implements TimestampStorage {
49
50 long maxTimestamp = 0;
51
52 @Override
53 public void updateMaxTimestamp(long previousMaxTimestamp, long nextMaxTimestamp) {
54 maxTimestamp = nextMaxTimestamp;
55 LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTimestamp, nextMaxTimestamp);
56 }
57
58 @Override
59 public long getMaxTimestamp() {
60 return maxTimestamp;
61 }
62
63 }
64
65 private class AllocateTimestampBatchTask implements Runnable {
66 long previousMaxTimestamp;
67
68 AllocateTimestampBatchTask(long previousMaxTimestamp) {
69 this.previousMaxTimestamp = previousMaxTimestamp;
70 }
71
72 @Override
73 public void run() {
74 long newMaxTimestamp = previousMaxTimestamp + TIMESTAMP_BATCH;
75 try {
76 storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
77 maxAllocatedTimestamp = newMaxTimestamp;
78 previousMaxTimestamp = newMaxTimestamp;
79 } catch (Throwable e) {
80 panicker.panic("Can't store the new max timestamp", e);
81 }
82 }
83
84 }
85
86 static final long TIMESTAMP_BATCH = 10_000_000 * CommitTable.MAX_CHECKPOINTS_PER_TXN;
87 private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000 * CommitTable.MAX_CHECKPOINTS_PER_TXN;
88
89 private long lastTimestamp;
90
91 private long maxTimestamp;
92
93 private TimestampStorage storage;
94 private Panicker panicker;
95
96 private long nextAllocationThreshold;
97 private volatile long maxAllocatedTimestamp;
98
99 private Executor executor = Executors.newSingleThreadExecutor(
100 new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
101
102 private Runnable allocateTimestampsBatchTask;
103
104 @Inject
105 public TimestampOracleImpl(MetricsRegistry metrics,
106 TimestampStorage tsStorage,
107 Panicker panicker) throws IOException {
108
109 this.storage = tsStorage;
110 this.panicker = panicker;
111
112 metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
113 @Override
114 public Long getValue() {
115 return maxTimestamp;
116 }
117 });
118
119 }
120
121 @Override
122 public void initialize() throws IOException {
123
124 this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
125
126 this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
127
128
129 executor.execute(allocateTimestampsBatchTask);
130
131 LOG.info("Initializing timestamp oracle with timestamp {}", this.lastTimestamp);
132 }
133
134
135
136
137 @SuppressWarnings("StatementWithEmptyBody")
138 @Override
139 public long next() {
140 lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
141
142 if (lastTimestamp >= nextAllocationThreshold) {
143
144
145 nextAllocationThreshold = Long.MAX_VALUE;
146 executor.execute(allocateTimestampsBatchTask);
147 }
148
149 if (lastTimestamp >= maxTimestamp) {
150 assert (maxTimestamp <= maxAllocatedTimestamp);
151 while (maxAllocatedTimestamp == maxTimestamp) {
152
153 }
154 assert (maxAllocatedTimestamp > maxTimestamp);
155 maxTimestamp = maxAllocatedTimestamp;
156 nextAllocationThreshold = maxTimestamp - TIMESTAMP_REMAINING_THRESHOLD;
157 assert (nextAllocationThreshold > lastTimestamp && nextAllocationThreshold < maxTimestamp);
158 assert (lastTimestamp < maxTimestamp);
159 }
160
161 return lastTimestamp;
162 }
163
164 @Override
165 public long getLast() {
166 return lastTimestamp;
167 }
168
169 @Override
170 public String toString() {
171 return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
172 }
173
174 }