1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import com.google.inject.Inject;
24 import com.google.inject.name.Named;
25 import com.lmax.disruptor.EventFactory;
26 import com.lmax.disruptor.EventHandler;
27 import com.lmax.disruptor.RingBuffer;
28 import com.lmax.disruptor.WaitStrategy;
29 import com.lmax.disruptor.dsl.Disruptor;
30
31 import org.apache.commons.pool2.ObjectPool;
32 import org.apache.omid.metrics.Meter;
33 import org.apache.omid.metrics.MetricsRegistry;
34 import org.apache.omid.proto.TSOProto;
35 import org.jboss.netty.channel.Channel;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import java.util.Comparator;
40 import java.util.PriorityQueue;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 import static com.codahale.metrics.MetricRegistry.name;
46 import static com.lmax.disruptor.dsl.ProducerType.MULTI;
47 import static java.util.concurrent.TimeUnit.SECONDS;
48 import static org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY;
49
50 class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
51
52 private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
53
54
55 private final ExecutorService disruptorExec;
56 private final Disruptor<ReplyBatchEvent> disruptor;
57 private final RingBuffer<ReplyBatchEvent> replyRing;
58
59 private final ObjectPool<Batch> batchPool;
60
61 @VisibleForTesting
62 AtomicLong nextIDToHandle = new AtomicLong();
63
64 @VisibleForTesting
65 PriorityQueue<ReplyBatchEvent> futureEvents;
66
67
68 private final Meter abortMeter;
69 private final Meter commitMeter;
70 private final Meter timestampMeter;
71 private final Meter fenceMeter;
72
73 private final LowWatermarkWriter lowWatermarkWriter;
74 private long highestLowWaterMarkSeen;
75
76 @Inject
77 ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
78 MetricsRegistry metrics,
79 Panicker panicker,
80 ObjectPool<Batch> batchPool,
81 LowWatermarkWriter lowWatermarkWriter) {
82 this.lowWatermarkWriter = lowWatermarkWriter;
83
84
85
86
87
88 ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
89 this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build());
90
91 this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy);
92 disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
93 disruptor.handleEventsWith(this);
94 this.replyRing = disruptor.start();
95
96
97
98
99
100 this.batchPool = batchPool;
101 this.nextIDToHandle.set(0);
102 this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
103 public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
104 return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
105 }
106 });
107
108
109 this.abortMeter = metrics.meter(name("tso", "aborts"));
110 this.commitMeter = metrics.meter(name("tso", "commits"));
111 this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
112 this.fenceMeter = metrics.meter(name("tso", "fences"));
113
114 LOG.info("ReplyProcessor initialized");
115
116 highestLowWaterMarkSeen = -1;
117 }
118
119 @VisibleForTesting
120 void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
121
122 Batch batch = replyBatchEvent.getBatch();
123 for (int i = 0; i < batch.getNumEvents(); i++) {
124 PersistEvent event = batch.get(i);
125
126 switch (event.getType()) {
127 case COMMIT:
128 sendCommitResponse(event.getStartTimestamp(),
129 event.getCommitTimestamp(),
130 event.getChannel(),
131 event.getMonCtx(),
132 event.getNewLowWatermark());
133 break;
134 case ABORT:
135 sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
136 break;
137 case TIMESTAMP:
138 sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
139 break;
140 case FENCE:
141 sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
142 break;
143 case COMMIT_RETRY:
144 throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
145 default:
146 throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
147 }
148 event.getMonCtx().publish();
149 }
150
151 batchPool.returnObject(batch);
152 }
153
154 private void processWaitingEvents() throws Exception {
155
156 while (!futureEvents.isEmpty() && futureEvents.peek().getBatchSequence() == nextIDToHandle.get()) {
157 ReplyBatchEvent e = futureEvents.poll();
158 handleReplyBatchEvent(e);
159 nextIDToHandle.incrementAndGet();
160 }
161
162 }
163
164 public void onEvent(ReplyBatchEvent event, long sequence, boolean endOfBatch) throws Exception {
165
166
167
168
169
170
171 if (event.getBatchSequence() > nextIDToHandle.get()) {
172 futureEvents.add(event);
173 return;
174 }
175
176 handleReplyBatchEvent(event);
177
178 nextIDToHandle.incrementAndGet();
179
180
181 processWaitingEvents();
182
183 }
184
185 @Override
186 public void manageResponsesBatch(long batchSequence, Batch batch) {
187
188 long seq = replyRing.next();
189 ReplyBatchEvent e = replyRing.get(seq);
190 ReplyBatchEvent.makeReplyBatch(e, batch, batchSequence);
191 replyRing.publish(seq);
192
193 }
194
195 @VisibleForTesting
196 void updateLowWatermark(Optional<Long> newLowwatermark) {
197 if (newLowwatermark.isPresent() && newLowwatermark.get() > highestLowWaterMarkSeen) {
198 highestLowWaterMarkSeen = newLowwatermark.get();
199 lowWatermarkWriter.persistLowWatermark(highestLowWaterMarkSeen);
200 }
201 }
202
203 @Override
204 public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx
205 , Optional<Long> newLowWatermark) {
206 updateLowWatermark(newLowWatermark);
207 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
208 TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
209 commitBuilder.setAborted(false)
210 .setStartTimestamp(startTimestamp)
211 .setCommitTimestamp(commitTimestamp);
212 builder.setCommitResponse(commitBuilder.build());
213 c.write(builder.build());
214 commitMeter.mark();
215 monCtx.timerStop("reply.processor.commit.latency");
216 }
217
218 @Override
219 public void sendAbortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
220
221 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
222 TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
223 commitBuilder.setAborted(true);
224 commitBuilder.setStartTimestamp(startTimestamp);
225 builder.setCommitResponse(commitBuilder.build());
226 c.write(builder.build());
227 abortMeter.mark();
228 monCtx.timerStop("reply.processor.abort.latency");
229 }
230
231 @Override
232 public void sendTimestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
233
234 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
235 TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
236 respBuilder.setStartTimestamp(startTimestamp);
237 builder.setTimestampResponse(respBuilder.build());
238 c.write(builder.build());
239 timestampMeter.mark();
240 monCtx.timerStop("reply.processor.timestamp.latency");
241 }
242
243 @Override
244 public void sendFenceResponse(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) {
245
246 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
247 TSOProto.FenceResponse.Builder fenceBuilder = TSOProto.FenceResponse.newBuilder();
248 fenceBuilder.setTableId(tableID);
249 fenceBuilder.setFenceId(fenceTimestamp);
250 builder.setFenceResponse(fenceBuilder.build());
251 c.write(builder.build());
252 monCtx.timerStop("reply.processor.fence.latency");
253 fenceMeter.mark();
254 }
255
256 @Override
257 public void close() {
258
259 LOG.info("Terminating Reply Processor...");
260 disruptor.halt();
261 disruptor.shutdown();
262 LOG.info("\tReply Processor Disruptor shutdown");
263 disruptorExec.shutdownNow();
264 try {
265 disruptorExec.awaitTermination(3, SECONDS);
266 LOG.info("\tReply Processor Disruptor executor shutdown");
267 } catch (InterruptedException e) {
268 LOG.error("Interrupted whilst finishing Reply Processor Disruptor executor");
269 Thread.currentThread().interrupt();
270 }
271 LOG.info("Reply Processor terminated");
272
273 }
274
275 final static class ReplyBatchEvent {
276
277 private Batch batch;
278 private long batchSequence;
279
280 static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchSequence) {
281 e.batch = batch;
282 e.batchSequence = batchSequence;
283 }
284
285 Batch getBatch() {
286 return batch;
287 }
288
289 long getBatchSequence() {
290 return batchSequence;
291 }
292
293 final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {
294 public ReplyBatchEvent newInstance() {
295 return new ReplyBatchEvent();
296 }
297 };
298
299 }
300
301 }
302