View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import com.google.inject.Inject;
23  import com.google.inject.name.Named;
24  import com.lmax.disruptor.EventFactory;
25  import com.lmax.disruptor.EventHandler;
26  import com.lmax.disruptor.RingBuffer;
27  import com.lmax.disruptor.WaitStrategy;
28  import com.lmax.disruptor.dsl.Disruptor;
29  
30  import org.apache.commons.pool2.ObjectPool;
31  import org.apache.omid.metrics.Meter;
32  import org.apache.omid.metrics.MetricsRegistry;
33  import org.apache.omid.proto.TSOProto;
34  import org.jboss.netty.channel.Channel;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import java.util.Comparator;
39  import java.util.PriorityQueue;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import static com.codahale.metrics.MetricRegistry.name;
45  import static com.lmax.disruptor.dsl.ProducerType.MULTI;
46  import static java.util.concurrent.TimeUnit.SECONDS;
47  import static org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY;
48  
49  class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
50  
51      private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
52  
53      // Disruptor-related attributes
54      private final ExecutorService disruptorExec;
55      private final Disruptor<ReplyBatchEvent> disruptor;
56      private final RingBuffer<ReplyBatchEvent> replyRing;
57  
58      private final ObjectPool<Batch> batchPool;
59  
60      @VisibleForTesting
61      AtomicLong nextIDToHandle = new AtomicLong();
62  
63      @VisibleForTesting
64      PriorityQueue<ReplyBatchEvent> futureEvents;
65  
66      // Metrics
67      private final Meter abortMeter;
68      private final Meter commitMeter;
69      private final Meter timestampMeter;
70      private final Meter fenceMeter;
71  
72      @Inject
73      ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
74              MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
75  
76          // ------------------------------------------------------------------------------------------------------------
77          // Disruptor initialization
78          // ------------------------------------------------------------------------------------------------------------
79  
80          ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
81          this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build());
82  
83          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy);
84          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
85          disruptor.handleEventsWith(this);
86          this.replyRing = disruptor.start();
87  
88          // ------------------------------------------------------------------------------------------------------------
89          // Attribute initialization
90          // ------------------------------------------------------------------------------------------------------------
91  
92          this.batchPool = batchPool;
93          this.nextIDToHandle.set(0);
94          this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
95              public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
96                  return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
97              }
98          });
99  
100         // Metrics config
101         this.abortMeter = metrics.meter(name("tso", "aborts"));
102         this.commitMeter = metrics.meter(name("tso", "commits"));
103         this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
104         this.fenceMeter = metrics.meter(name("tso", "fences"));
105 
106         LOG.info("ReplyProcessor initialized");
107 
108     }
109 
110     @VisibleForTesting
111     void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
112 
113         Batch batch = replyBatchEvent.getBatch();
114         for (int i = 0; i < batch.getNumEvents(); i++) {
115             PersistEvent event = batch.get(i);
116 
117             switch (event.getType()) {
118                 case COMMIT:
119                     sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
120                     break;
121                 case ABORT:
122                     sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
123                     break;
124                 case TIMESTAMP:
125                     sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
126                     break;
127                 case FENCE:
128                     sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
129                     break;
130                 case COMMIT_RETRY:
131                     throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
132                 default:
133                     throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
134             }
135             event.getMonCtx().publish();
136         }
137 
138         batchPool.returnObject(batch);
139 
140     }
141 
142     private void processWaitingEvents() throws Exception {
143 
144         while (!futureEvents.isEmpty() && futureEvents.peek().getBatchSequence() == nextIDToHandle.get()) {
145             ReplyBatchEvent e = futureEvents.poll();
146             handleReplyBatchEvent(e);
147             nextIDToHandle.incrementAndGet();
148         }
149 
150     }
151 
152     public void onEvent(ReplyBatchEvent event, long sequence, boolean endOfBatch) throws Exception {
153 
154         // Order of event's reply need to be guaranteed in order to preserve snapshot isolation.
155         // This is done in order to present a scenario where a start id of N is returned
156         // while commit smaller than still does not appear in the commit table.
157 
158         // If previous events were not processed yet (events contain smaller id)
159         if (event.getBatchSequence() > nextIDToHandle.get()) {
160             futureEvents.add(event);
161             return;
162         }
163 
164         handleReplyBatchEvent(event);
165 
166         nextIDToHandle.incrementAndGet();
167 
168         // Process events that arrived before and kept in futureEvents.
169         processWaitingEvents();
170 
171     }
172 
173     @Override
174     public void manageResponsesBatch(long batchSequence, Batch batch) {
175 
176         long seq = replyRing.next();
177         ReplyBatchEvent e = replyRing.get(seq);
178         ReplyBatchEvent.makeReplyBatch(e, batch, batchSequence);
179         replyRing.publish(seq);
180 
181     }
182 
183     @Override
184     public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
185 
186         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
187         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
188         commitBuilder.setAborted(false)
189                 .setStartTimestamp(startTimestamp)
190                 .setCommitTimestamp(commitTimestamp);
191         builder.setCommitResponse(commitBuilder.build());
192         c.write(builder.build());
193         commitMeter.mark();
194         monCtx.timerStop("reply.processor.commit.latency");
195     }
196 
197     @Override
198     public void sendAbortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
199 
200         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
201         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
202         commitBuilder.setAborted(true);
203         commitBuilder.setStartTimestamp(startTimestamp);
204         builder.setCommitResponse(commitBuilder.build());
205         c.write(builder.build());
206         abortMeter.mark();
207         monCtx.timerStop("reply.processor.abort.latency");
208     }
209 
210     @Override
211     public void sendTimestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
212 
213         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
214         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
215         respBuilder.setStartTimestamp(startTimestamp);
216         builder.setTimestampResponse(respBuilder.build());
217         c.write(builder.build());
218         timestampMeter.mark();
219         monCtx.timerStop("reply.processor.timestamp.latency");
220     }
221 
222     @Override
223     public void sendFenceResponse(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) {
224 
225         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
226         TSOProto.FenceResponse.Builder fenceBuilder = TSOProto.FenceResponse.newBuilder();
227         fenceBuilder.setTableId(tableID);
228         fenceBuilder.setFenceId(fenceTimestamp);
229         builder.setFenceResponse(fenceBuilder.build());
230         c.write(builder.build());
231         monCtx.timerStop("reply.processor.fence.latency");
232         fenceMeter.mark();
233     }
234 
235     @Override
236     public void close() {
237 
238         LOG.info("Terminating Reply Processor...");
239         disruptor.halt();
240         disruptor.shutdown();
241         LOG.info("\tReply Processor Disruptor shutdown");
242         disruptorExec.shutdownNow();
243         try {
244             disruptorExec.awaitTermination(3, SECONDS);
245             LOG.info("\tReply Processor Disruptor executor shutdown");
246         } catch (InterruptedException e) {
247             LOG.error("Interrupted whilst finishing Reply Processor Disruptor executor");
248             Thread.currentThread().interrupt();
249         }
250         LOG.info("Reply Processor terminated");
251 
252     }
253 
254     final static class ReplyBatchEvent {
255 
256         private Batch batch;
257         private long batchSequence;
258 
259         static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchSequence) {
260             e.batch = batch;
261             e.batchSequence = batchSequence;
262         }
263 
264         Batch getBatch() {
265             return batch;
266         }
267 
268         long getBatchSequence() {
269             return batchSequence;
270         }
271 
272         final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {
273             public ReplyBatchEvent newInstance() {
274                 return new ReplyBatchEvent();
275             }
276         };
277 
278     }
279 
280 }
281