1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22
23 import org.apache.omid.committable.CommitTable;
24 import org.apache.omid.metrics.Gauge;
25 import org.apache.omid.metrics.MetricsRegistry;
26 import org.apache.omid.timestamp.storage.TimestampStorage;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32
33 import java.io.IOException;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37
38 import static org.apache.omid.metrics.MetricsUtils.name;
39
40
41
42
43 @Singleton
44 public class WorldClockOracleImpl implements TimestampOracle {
45
46 private static final Logger LOG = LoggerFactory.getLogger(WorldClockOracleImpl.class);
47
48 static final long MAX_TX_PER_MS = 1_000_000;
49 static final long TIMESTAMP_INTERVAL_MS = 10_000;
50 private static final long TIMESTAMP_ALLOCATION_INTERVAL_MS = 7_000;
51
52 private long lastTimestamp;
53 private long maxTimestamp;
54
55 private TimestampStorage storage;
56 private Panicker panicker;
57
58 private volatile long maxAllocatedTime;
59
60 private final ScheduledExecutorService scheduler =
61 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
62
63 private Runnable allocateTimestampsBatchTask;
64
65 private class AllocateTimestampBatchTask implements Runnable {
66 long previousMaxTime;
67
68 AllocateTimestampBatchTask(long previousMaxTime) {
69 this.previousMaxTime = previousMaxTime;
70 }
71
72 @Override
73 public void run() {
74 long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
75 try {
76 storage.updateMaxTimestamp(previousMaxTime, newMaxTime);
77 maxAllocatedTime = newMaxTime;
78 previousMaxTime = newMaxTime;
79 } catch (Throwable e) {
80 panicker.panic("Can't store the new max timestamp", e);
81 }
82 }
83 }
84
85 @Inject
86 public WorldClockOracleImpl(MetricsRegistry metrics,
87 TimestampStorage tsStorage,
88 Panicker panicker) throws IOException {
89
90 this.storage = tsStorage;
91 this.panicker = panicker;
92
93 metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
94 @Override
95 public Long getValue() {
96 return maxTimestamp;
97 }
98 });
99
100 }
101
102 @Override
103 public void initialize() throws IOException {
104
105 this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
106
107 this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
108
109
110 scheduler.schedule(allocateTimestampsBatchTask, 0, TimeUnit.MILLISECONDS);
111
112
113 while ((System.currentTimeMillis() * MAX_TX_PER_MS) < this.lastTimestamp) {
114 try {
115 Thread.sleep(1000);
116 } catch (InterruptedException e) {
117 continue;
118 }
119 }
120
121
122
123 scheduler.scheduleAtFixedRate(allocateTimestampsBatchTask, TIMESTAMP_ALLOCATION_INTERVAL_MS, TIMESTAMP_ALLOCATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
124 }
125
126
127
128
129 @Override
130 public long next() {
131
132 long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS;
133
134 lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
135
136
137 if (lastTimestamp >= currentMsFirstTimestamp) {
138 return lastTimestamp;
139 }
140
141 if (currentMsFirstTimestamp >= maxTimestamp) {
142 while (maxAllocatedTime <= currentMsFirstTimestamp) {
143 try {
144 Thread.sleep(1000);
145 } catch (InterruptedException e) {
146 continue;
147 }
148 }
149 assert (maxAllocatedTime > maxTimestamp);
150 maxTimestamp = maxAllocatedTime;
151 }
152
153 lastTimestamp = currentMsFirstTimestamp;
154
155 return lastTimestamp;
156 }
157
158 @Override
159 public long getLast() {
160 return lastTimestamp;
161 }
162
163 @Override
164 public String toString() {
165 return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
166 }
167
168 @VisibleForTesting
169 static class InMemoryTimestampStorage implements TimestampStorage {
170
171 long maxTime = 0;
172
173 @Override
174 public void updateMaxTimestamp(long previousMaxTime, long nextMaxTime) {
175 maxTime = nextMaxTime;
176 LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTime, nextMaxTime);
177 }
178
179 @Override
180 public long getMaxTimestamp() {
181 return maxTime;
182 }
183
184 }
185 }