1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import com.lmax.disruptor.EventFactory;
22 import com.lmax.disruptor.EventHandler;
23 import com.lmax.disruptor.RingBuffer;
24 import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
25 import com.lmax.disruptor.TimeoutHandler;
26 import com.lmax.disruptor.dsl.Disruptor;
27 import org.apache.omid.metrics.MetricsRegistry;
28 import org.apache.omid.tso.TSOStateManager.TSOState;
29 import org.jboss.netty.channel.Channel;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import javax.inject.Inject;
34 import java.io.IOException;
35 import java.util.Collection;
36 import java.util.Iterator;
37 import java.util.NoSuchElementException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.ThreadFactory;
41
42 import static com.lmax.disruptor.dsl.ProducerType.MULTI;
43 import static java.util.concurrent.TimeUnit.MILLISECONDS;
44 import static java.util.concurrent.TimeUnit.SECONDS;
45 import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
46
47 class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
48
49 private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
50
51
52 private final ExecutorService disruptorExec;
53 private final Disruptor<RequestEvent> disruptor;
54 private final RingBuffer<RequestEvent> requestRing;
55
56 private final TimestampOracle timestampOracle;
57 private final CommitHashMap hashmap;
58 private final MetricsRegistry metrics;
59 private final PersistenceProcessor persistProc;
60
61 private long lowWatermark = -1L;
62
63 @Inject
64 RequestProcessorImpl(MetricsRegistry metrics,
65 TimestampOracle timestampOracle,
66 PersistenceProcessor persistProc,
67 Panicker panicker,
68 TSOServerConfig config)
69 throws IOException {
70
71
72
73
74
75 TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
76
77 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
78 this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
79
80 this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
81 disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
82 disruptor.handleEventsWith(this);
83 this.requestRing = disruptor.start();
84
85
86
87
88
89 this.metrics = metrics;
90 this.persistProc = persistProc;
91 this.timestampOracle = timestampOracle;
92 this.hashmap = new CommitHashMap(config.getConflictMapSize());
93
94 LOG.info("RequestProcessor initialized");
95
96 }
97
98
99
100
101 @Override
102 public void update(TSOState state) throws Exception {
103 LOG.info("Initializing RequestProcessor state...");
104 this.lowWatermark = state.getLowWatermark();
105 persistProc.persistLowWatermark(lowWatermark).get();
106 LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
107 }
108
109 @Override
110 public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
111
112 switch (event.getType()) {
113 case TIMESTAMP:
114 handleTimestamp(event);
115 break;
116 case COMMIT:
117 handleCommit(event);
118 break;
119 default:
120 throw new IllegalStateException("Event not allowed in Request Processor: " + event);
121 }
122
123 }
124
125 @Override
126 public void onTimeout(long sequence) throws Exception {
127
128
129
130
131
132
133
134 persistProc.triggerCurrentBatchFlush();
135
136 }
137
138 @Override
139 public void timestampRequest(Channel c, MonitoringContext monCtx) {
140
141 monCtx.timerStart("request.processor.timestamp.latency");
142 long seq = requestRing.next();
143 RequestEvent e = requestRing.get(seq);
144 RequestEvent.makeTimestampRequest(e, c, monCtx);
145 requestRing.publish(seq);
146
147 }
148
149 @Override
150 public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c,
151 MonitoringContext monCtx) {
152
153 monCtx.timerStart("request.processor.commit.latency");
154 long seq = requestRing.next();
155 RequestEvent e = requestRing.get(seq);
156 RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
157 requestRing.publish(seq);
158
159 }
160
161 private void handleTimestamp(RequestEvent requestEvent) throws Exception {
162
163 long timestamp = timestampOracle.next();
164 requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
165 persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
166
167 }
168
169 private void handleCommit(RequestEvent event) throws Exception {
170
171 long startTimestamp = event.getStartTimestamp();
172 Iterable<Long> writeSet = event.writeSet();
173 boolean isCommitRetry = event.isCommitRetry();
174 Channel c = event.getChannel();
175
176 boolean txCanCommit;
177
178 int numCellsInWriteset = 0;
179
180 if (startTimestamp <= lowWatermark) {
181 txCanCommit = false;
182 } else {
183
184 txCanCommit = true;
185 for (long cellId : writeSet) {
186 long value = hashmap.getLatestWriteForCell(cellId);
187 if (value != 0 && value >= startTimestamp) {
188 txCanCommit = false;
189 break;
190 }
191 numCellsInWriteset++;
192 }
193 }
194
195 if (txCanCommit) {
196
197
198 long commitTimestamp = timestampOracle.next();
199
200 if (numCellsInWriteset > 0) {
201 long newLowWatermark = lowWatermark;
202
203 for (long r : writeSet) {
204 long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
205 newLowWatermark = Math.max(removed, newLowWatermark);
206 }
207
208 if (newLowWatermark != lowWatermark) {
209 LOG.trace("Setting new low Watermark to {}", newLowWatermark);
210 lowWatermark = newLowWatermark;
211 persistProc.persistLowWatermark(newLowWatermark);
212 }
213 }
214 event.getMonCtx().timerStop("request.processor.commit.latency");
215 persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
216
217 } else {
218
219 event.getMonCtx().timerStop("request.processor.commit.latency");
220 if (isCommitRetry) {
221 persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
222 } else {
223 persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
224 }
225
226 }
227
228 }
229
230 @Override
231 public void close() throws IOException {
232
233 LOG.info("Terminating Request Processor...");
234 disruptor.halt();
235 disruptor.shutdown();
236 LOG.info("\tRequest Processor Disruptor shutdown");
237 disruptorExec.shutdownNow();
238 try {
239 disruptorExec.awaitTermination(3, SECONDS);
240 LOG.info("\tRequest Processor Disruptor executor shutdown");
241 } catch (InterruptedException e) {
242 LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
243 Thread.currentThread().interrupt();
244 }
245 LOG.info("Request Processor terminated");
246
247 }
248
249 final static class RequestEvent implements Iterable<Long> {
250
251 enum Type {
252 TIMESTAMP, COMMIT
253 }
254
255 private Type type = null;
256 private Channel channel = null;
257
258 private boolean isCommitRetry = false;
259 private long startTimestamp = 0;
260 private MonitoringContext monCtx;
261 private long numCells = 0;
262
263 private static final int MAX_INLINE = 40;
264 private Long writeSet[] = new Long[MAX_INLINE];
265 private Collection<Long> writeSetAsCollection = null;
266
267 static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
268 e.type = Type.TIMESTAMP;
269 e.channel = c;
270 e.monCtx = monCtx;
271 }
272
273 static void makeCommitRequest(RequestEvent e,
274 long startTimestamp,
275 MonitoringContext monCtx,
276 Collection<Long> writeSet,
277 boolean isRetry,
278 Channel c) {
279 e.monCtx = monCtx;
280 e.type = Type.COMMIT;
281 e.channel = c;
282 e.startTimestamp = startTimestamp;
283 e.isCommitRetry = isRetry;
284 if (writeSet.size() > MAX_INLINE) {
285 e.numCells = writeSet.size();
286 e.writeSetAsCollection = writeSet;
287 } else {
288 e.writeSetAsCollection = null;
289 e.numCells = writeSet.size();
290 int i = 0;
291 for (Long cellId : writeSet) {
292 e.writeSet[i] = cellId;
293 i++;
294 }
295 }
296
297 }
298
299 MonitoringContext getMonCtx() {
300 return monCtx;
301 }
302
303 Type getType() {
304 return type;
305 }
306
307 long getStartTimestamp() {
308 return startTimestamp;
309 }
310
311 Channel getChannel() {
312 return channel;
313 }
314
315 @Override
316 public Iterator<Long> iterator() {
317
318 if (writeSetAsCollection != null) {
319 return writeSetAsCollection.iterator();
320 }
321
322 return new Iterator<Long>() {
323 int i = 0;
324
325 @Override
326 public boolean hasNext() {
327 return i < numCells;
328 }
329
330 @Override
331 public Long next() {
332 if (!hasNext()) {
333 throw new NoSuchElementException();
334 }
335 return writeSet[i++];
336 }
337
338 @Override
339 public void remove() {
340 throw new UnsupportedOperationException();
341 }
342 };
343
344 }
345
346 Iterable<Long> writeSet() {
347
348 return this;
349
350 }
351
352 boolean isCommitRetry() {
353 return isCommitRetry;
354 }
355
356 final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
357 @Override
358 public RequestEvent newInstance() {
359 return new RequestEvent();
360 }
361 };
362
363 }
364
365 }