1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import com.lmax.disruptor.EventFactory;
23 import com.lmax.disruptor.EventHandler;
24 import com.lmax.disruptor.RingBuffer;
25 import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
26 import com.lmax.disruptor.TimeoutHandler;
27 import com.lmax.disruptor.dsl.Disruptor;
28
29 import org.apache.omid.metrics.MetricsRegistry;
30 import org.apache.omid.tso.TSOStateManager.TSOState;
31 import org.jboss.netty.channel.Channel;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import java.io.IOException;
36 import java.util.Collection;
37 import java.util.HashMap;
38 import java.util.Iterator;
39 import java.util.Map;
40 import java.util.NoSuchElementException;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.ThreadFactory;
44
45 import static com.lmax.disruptor.dsl.ProducerType.MULTI;
46 import static java.util.concurrent.TimeUnit.MILLISECONDS;
47 import static java.util.concurrent.TimeUnit.SECONDS;
48 import static org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.EVENT_FACTORY;
49
50 abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestProcessor.RequestEvent>, RequestProcessor, TimeoutHandler {
51
52 private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class);
53
54
55 private final ExecutorService disruptorExec;
56 protected final Disruptor<RequestEvent> disruptor;
57 protected RingBuffer<RequestEvent> requestRing;
58
59 private final TimestampOracle timestampOracle;
60 private final CommitHashMap hashmap;
61 private final Map<Long, Long> tableFences;
62 private final MetricsRegistry metrics;
63 private final LowWatermarkWriter lowWatermarkWriter;
64 private long lowWatermark = -1L;
65
66
67 private final ReplyProcessor replyProcessor;
68
69 AbstractRequestProcessor(MetricsRegistry metrics,
70 TimestampOracle timestampOracle,
71 Panicker panicker,
72 TSOServerConfig config,
73 LowWatermarkWriter lowWatermarkWriter, ReplyProcessor replyProcessor)
74 throws IOException {
75
76
77
78
79
80
81 TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
82
83 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
84 this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
85
86 this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
87 disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
88 disruptor.handleEventsWith(this);
89
90
91
92
93
94
95 this.metrics = metrics;
96 this.timestampOracle = timestampOracle;
97 this.hashmap = new CommitHashMap(config.getConflictMapSize());
98 this.tableFences = new HashMap<Long, Long>();
99 this.lowWatermarkWriter = lowWatermarkWriter;
100
101 this.replyProcessor = replyProcessor;
102
103 LOG.info("RequestProcessor initialized");
104
105 }
106
107
108
109
110 @Override
111 public void update(TSOState state) throws Exception {
112 LOG.info("Initializing RequestProcessor state...");
113 this.lowWatermark = state.getLowWatermark();
114 lowWatermarkWriter.persistLowWatermark(lowWatermark).get();
115 LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
116 }
117
118 @Override
119 public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
120
121 switch (event.getType()) {
122 case TIMESTAMP:
123 handleTimestamp(event);
124 break;
125 case COMMIT:
126 handleCommit(event);
127 break;
128 case FENCE:
129 handleFence(event);
130 break;
131 default:
132 throw new IllegalStateException("Event not allowed in Request Processor: " + event);
133 }
134
135 }
136
137 @Override
138 public void onTimeout(long sequence) throws Exception {
139
140
141
142
143
144
145
146 onTimeout();
147 }
148
149 @Override
150 public void timestampRequest(Channel c, MonitoringContext monCtx) {
151
152 monCtx.timerStart("request.processor.timestamp.latency");
153 long seq = requestRing.next();
154 RequestEvent e = requestRing.get(seq);
155 RequestEvent.makeTimestampRequest(e, c, monCtx);
156 requestRing.publish(seq);
157
158 }
159
160 @Override
161 public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
162 MonitoringContext monCtx) {
163
164 monCtx.timerStart("request.processor.commit.latency");
165 long seq = requestRing.next();
166 RequestEvent e = requestRing.get(seq);
167 RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
168 requestRing.publish(seq);
169
170 }
171
172 @Override
173 public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
174
175 monCtx.timerStart("request.processor.fence.latency");
176 long seq = requestRing.next();
177 RequestEvent e = requestRing.get(seq);
178 RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
179 requestRing.publish(seq);
180
181 }
182
183 private void handleTimestamp(RequestEvent requestEvent) throws Exception {
184
185 long timestamp = timestampOracle.next();
186 requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
187 forwardTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
188 }
189
190
191 private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
192 if (!tableFences.isEmpty()) {
193 for (long tableId: tableIdSet) {
194 Long fence = tableFences.get(tableId);
195 if (fence != null && fence > startTimestamp) {
196 return true;
197 }
198 if (fence != null && fence < lowWatermark) {
199 tableFences.remove(tableId);
200 }
201 }
202 }
203
204 return false;
205 }
206
207
208 private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
209 for (long cellId : writeSet) {
210 long value = hashmap.getLatestWriteForCell(cellId);
211 if (value != 0 && value >= startTimestamp) {
212 return true;
213 }
214 }
215
216 return false;
217 }
218
219 private void handleCommit(RequestEvent event) throws Exception {
220
221 long startTimestamp = event.getStartTimestamp();
222 Iterable<Long> writeSet = event.writeSet();
223 Collection<Long> tableIdSet = event.getTableIdSet();
224 boolean isCommitRetry = event.isCommitRetry();
225 Channel c = event.getChannel();
226
227 boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
228
229
230
231
232
233 if (startTimestamp > lowWatermark &&
234 !hasConflictsWithFences(startTimestamp, tableIdSet) &&
235 !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
236
237 long commitTimestamp = timestampOracle.next();
238 Optional<Long> forwardNewWaterMark = Optional.absent();
239 if (nonEmptyWriteSet) {
240 long newLowWatermark = lowWatermark;
241
242 for (long r : writeSet) {
243 long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
244 newLowWatermark = Math.max(removed, newLowWatermark);
245 }
246
247 if (newLowWatermark != lowWatermark) {
248 LOG.trace("Setting new low Watermark to {}", newLowWatermark);
249 lowWatermark = newLowWatermark;
250 forwardNewWaterMark = Optional.of(lowWatermark);
251 }
252 }
253 event.getMonCtx().timerStop("request.processor.commit.latency");
254 forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);
255
256 } else {
257
258 event.getMonCtx().timerStop("request.processor.commit.latency");
259 if (isCommitRetry) {
260 forwardCommitRetry(startTimestamp, c, event.getMonCtx());
261 } else {
262 forwardAbort(startTimestamp, c, event.getMonCtx());
263 }
264
265 }
266
267 }
268
269 private void handleFence(RequestEvent event) throws Exception {
270 long tableID = event.getTableId();
271 Channel c = event.getChannel();
272
273 long fenceTimestamp = timestampOracle.next();
274
275 tableFences.put(tableID, fenceTimestamp);
276
277 event.monCtx.timerStart("reply.processor.fence.latency");
278 replyProcessor.sendFenceResponse(tableID, fenceTimestamp, c, event.monCtx);
279 }
280
281 @Override
282 public void close() throws IOException {
283
284 LOG.info("Terminating Request Processor...");
285 disruptor.halt();
286 disruptor.shutdown();
287 LOG.info("\tRequest Processor Disruptor shutdown");
288 disruptorExec.shutdownNow();
289 try {
290 disruptorExec.awaitTermination(3, SECONDS);
291 LOG.info("\tRequest Processor Disruptor executor shutdown");
292 } catch (InterruptedException e) {
293 LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
294 Thread.currentThread().interrupt();
295 }
296 LOG.info("Request Processor terminated");
297
298 }
299
300 protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark) throws Exception;
301 protected abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
302 protected abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
303 protected abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
304 protected abstract void onTimeout() throws Exception;
305
306
307
308 final static class RequestEvent implements Iterable<Long> {
309
310 enum Type {
311 TIMESTAMP, COMMIT, FENCE
312 }
313
314 private Type type = null;
315 private Channel channel = null;
316
317 private boolean isCommitRetry = false;
318 private long startTimestamp = 0;
319 private MonitoringContext monCtx;
320 private long numCells = 0;
321
322 private static final int MAX_INLINE = 40;
323 private Long writeSet[] = new Long[MAX_INLINE];
324 private Collection<Long> writeSetAsCollection = null;
325
326 private Collection<Long> tableIdSet = null;
327 private long tableID = 0;
328
329 static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
330 e.type = Type.TIMESTAMP;
331 e.channel = c;
332 e.monCtx = monCtx;
333 }
334
335 static void makeCommitRequest(RequestEvent e,
336 long startTimestamp,
337 MonitoringContext monCtx,
338 Collection<Long> writeSet,
339 Collection<Long> TableIdSet,
340 boolean isRetry,
341 Channel c) {
342 e.monCtx = monCtx;
343 e.type = Type.COMMIT;
344 e.channel = c;
345 e.startTimestamp = startTimestamp;
346 e.isCommitRetry = isRetry;
347 if (writeSet.size() > MAX_INLINE) {
348 e.numCells = writeSet.size();
349 e.writeSetAsCollection = writeSet;
350 } else {
351 e.writeSetAsCollection = null;
352 e.numCells = writeSet.size();
353 int i = 0;
354 for (Long cellId : writeSet) {
355 e.writeSet[i] = cellId;
356 ++i;
357 }
358 }
359 e.tableIdSet = TableIdSet;
360 }
361
362 static void makeFenceRequest(RequestEvent e,
363 long tableID,
364 Channel c,
365 MonitoringContext monCtx) {
366 e.type = Type.FENCE;
367 e.channel = c;
368 e.monCtx = monCtx;
369 e.tableID = tableID;
370 }
371
372 MonitoringContext getMonCtx() {
373 return monCtx;
374 }
375
376 Type getType() {
377 return type;
378 }
379
380 long getStartTimestamp() {
381 return startTimestamp;
382 }
383
384 Channel getChannel() {
385 return channel;
386 }
387
388 Collection<Long> getTableIdSet() {
389 return tableIdSet;
390 }
391
392 long getTableId() {
393 return tableID;
394 }
395
396 @Override
397 public Iterator<Long> iterator() {
398
399 if (writeSetAsCollection != null) {
400 return writeSetAsCollection.iterator();
401 }
402
403 return new Iterator<Long>() {
404 int i = 0;
405
406 @Override
407 public boolean hasNext() {
408 return i < numCells;
409 }
410
411 @Override
412 public Long next() {
413 if (!hasNext()) {
414 throw new NoSuchElementException();
415 }
416 return writeSet[i++];
417 }
418
419 @Override
420 public void remove() {
421 throw new UnsupportedOperationException();
422 }
423 };
424
425 }
426
427 Iterable<Long> writeSet() {
428
429 return this;
430
431 }
432
433 boolean isCommitRetry() {
434 return isCommitRetry;
435 }
436
437 final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
438 @Override
439 public RequestEvent newInstance() {
440 return new RequestEvent();
441 }
442 };
443
444 }
445
446 }