View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
23  import com.google.inject.Inject;
24  import com.google.inject.name.Named;
25  import com.lmax.disruptor.EventFactory;
26  import com.lmax.disruptor.EventHandler;
27  import com.lmax.disruptor.RingBuffer;
28  import com.lmax.disruptor.WaitStrategy;
29  import com.lmax.disruptor.dsl.Disruptor;
30  
31  import org.apache.commons.pool2.ObjectPool;
32  import org.apache.omid.metrics.Meter;
33  import org.apache.omid.metrics.MetricsRegistry;
34  import org.apache.omid.proto.TSOProto;
35  import org.jboss.netty.channel.Channel;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import java.util.Comparator;
40  import java.util.PriorityQueue;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.Executors;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import static com.codahale.metrics.MetricRegistry.name;
46  import static com.lmax.disruptor.dsl.ProducerType.MULTI;
47  import static java.util.concurrent.TimeUnit.SECONDS;
48  import static org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY;
49  
50  class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
51  
52      private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
53  
54      // Disruptor-related attributes
55      private final ExecutorService disruptorExec;
56      private final Disruptor<ReplyBatchEvent> disruptor;
57      private final RingBuffer<ReplyBatchEvent> replyRing;
58  
59      private final ObjectPool<Batch> batchPool;
60  
61      @VisibleForTesting
62      AtomicLong nextIDToHandle = new AtomicLong();
63  
64      @VisibleForTesting
65      PriorityQueue<ReplyBatchEvent> futureEvents;
66  
67      // Metrics
68      private final Meter abortMeter;
69      private final Meter commitMeter;
70      private final Meter timestampMeter;
71      private final Meter fenceMeter;
72  
73      private final LowWatermarkWriter lowWatermarkWriter;
74      private long highestLowWaterMarkSeen;
75  
76      @Inject
77      ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
78                         MetricsRegistry metrics,
79                         Panicker panicker,
80                         ObjectPool<Batch> batchPool,
81                         LowWatermarkWriter lowWatermarkWriter) {
82          this.lowWatermarkWriter = lowWatermarkWriter;
83  
84          // ------------------------------------------------------------------------------------------------------------
85          // Disruptor initialization
86          // ------------------------------------------------------------------------------------------------------------
87  
88          ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
89          this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build());
90  
91          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy);
92          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
93          disruptor.handleEventsWith(this);
94          this.replyRing = disruptor.start();
95  
96          // ------------------------------------------------------------------------------------------------------------
97          // Attribute initialization
98          // ------------------------------------------------------------------------------------------------------------
99  
100         this.batchPool = batchPool;
101         this.nextIDToHandle.set(0);
102         this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
103             public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
104                 return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
105             }
106         });
107 
108         // Metrics config
109         this.abortMeter = metrics.meter(name("tso", "aborts"));
110         this.commitMeter = metrics.meter(name("tso", "commits"));
111         this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
112         this.fenceMeter = metrics.meter(name("tso", "fences"));
113 
114         LOG.info("ReplyProcessor initialized");
115 
116         highestLowWaterMarkSeen = -1;
117     }
118 
119     @VisibleForTesting
120     void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
121 
122         Batch batch = replyBatchEvent.getBatch();
123         for (int i = 0; i < batch.getNumEvents(); i++) {
124             PersistEvent event = batch.get(i);
125 
126             switch (event.getType()) {
127                 case COMMIT:
128                     sendCommitResponse(event.getStartTimestamp(),
129                             event.getCommitTimestamp(),
130                             event.getChannel(),
131                             event.getMonCtx(),
132                             event.getNewLowWatermark());
133                     break;
134                 case ABORT:
135                     sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
136                     break;
137                 case TIMESTAMP:
138                     sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
139                     break;
140                 case FENCE:
141                     sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
142                     break;
143                 case COMMIT_RETRY:
144                     throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
145                 default:
146                     throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
147             }
148             event.getMonCtx().publish();
149         }
150 
151         batchPool.returnObject(batch);
152     }
153 
154     private void processWaitingEvents() throws Exception {
155 
156         while (!futureEvents.isEmpty() && futureEvents.peek().getBatchSequence() == nextIDToHandle.get()) {
157             ReplyBatchEvent e = futureEvents.poll();
158             handleReplyBatchEvent(e);
159             nextIDToHandle.incrementAndGet();
160         }
161 
162     }
163 
164     public void onEvent(ReplyBatchEvent event, long sequence, boolean endOfBatch) throws Exception {
165 
166         // Order of event's reply need to be guaranteed in order to preserve snapshot isolation.
167         // This is done in order to present a scenario where a start id of N is returned
168         // while commit smaller than still does not appear in the commit table.
169 
170         // If previous events were not processed yet (events contain smaller id)
171         if (event.getBatchSequence() > nextIDToHandle.get()) {
172             futureEvents.add(event);
173             return;
174         }
175 
176         handleReplyBatchEvent(event);
177 
178         nextIDToHandle.incrementAndGet();
179 
180         // Process events that arrived before and kept in futureEvents.
181         processWaitingEvents();
182 
183     }
184 
185     @Override
186     public void manageResponsesBatch(long batchSequence, Batch batch) {
187 
188         long seq = replyRing.next();
189         ReplyBatchEvent e = replyRing.get(seq);
190         ReplyBatchEvent.makeReplyBatch(e, batch, batchSequence);
191         replyRing.publish(seq);
192 
193     }
194 
195     @VisibleForTesting
196     void updateLowWatermark(Optional<Long> newLowwatermark) {
197         if (newLowwatermark.isPresent() && newLowwatermark.get() > highestLowWaterMarkSeen) {
198             highestLowWaterMarkSeen = newLowwatermark.get();
199             lowWatermarkWriter.persistLowWatermark(highestLowWaterMarkSeen);
200         }
201     }
202 
203     @Override
204     public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx
205             , Optional<Long> newLowWatermark) {
206         updateLowWatermark(newLowWatermark);
207         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
208         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
209         commitBuilder.setAborted(false)
210                 .setStartTimestamp(startTimestamp)
211                 .setCommitTimestamp(commitTimestamp);
212         builder.setCommitResponse(commitBuilder.build());
213         c.write(builder.build());
214         commitMeter.mark();
215         monCtx.timerStop("reply.processor.commit.latency");
216     }
217 
218     @Override
219     public void sendAbortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
220 
221         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
222         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
223         commitBuilder.setAborted(true);
224         commitBuilder.setStartTimestamp(startTimestamp);
225         builder.setCommitResponse(commitBuilder.build());
226         c.write(builder.build());
227         abortMeter.mark();
228         monCtx.timerStop("reply.processor.abort.latency");
229     }
230 
231     @Override
232     public void sendTimestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
233 
234         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
235         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
236         respBuilder.setStartTimestamp(startTimestamp);
237         builder.setTimestampResponse(respBuilder.build());
238         c.write(builder.build());
239         timestampMeter.mark();
240         monCtx.timerStop("reply.processor.timestamp.latency");
241     }
242 
243     @Override
244     public void sendFenceResponse(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) {
245 
246         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
247         TSOProto.FenceResponse.Builder fenceBuilder = TSOProto.FenceResponse.newBuilder();
248         fenceBuilder.setTableId(tableID);
249         fenceBuilder.setFenceId(fenceTimestamp);
250         builder.setFenceResponse(fenceBuilder.build());
251         c.write(builder.build());
252         monCtx.timerStop("reply.processor.fence.latency");
253         fenceMeter.mark();
254     }
255 
256     @Override
257     public void close() {
258 
259         LOG.info("Terminating Reply Processor...");
260         disruptor.halt();
261         disruptor.shutdown();
262         LOG.info("\tReply Processor Disruptor shutdown");
263         disruptorExec.shutdownNow();
264         try {
265             disruptorExec.awaitTermination(3, SECONDS);
266             LOG.info("\tReply Processor Disruptor executor shutdown");
267         } catch (InterruptedException e) {
268             LOG.error("Interrupted whilst finishing Reply Processor Disruptor executor");
269             Thread.currentThread().interrupt();
270         }
271         LOG.info("Reply Processor terminated");
272 
273     }
274 
275     final static class ReplyBatchEvent {
276 
277         private Batch batch;
278         private long batchSequence;
279 
280         static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchSequence) {
281             e.batch = batch;
282             e.batchSequence = batchSequence;
283         }
284 
285         Batch getBatch() {
286             return batch;
287         }
288 
289         long getBatchSequence() {
290             return batchSequence;
291         }
292 
293         final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {
294             public ReplyBatchEvent newInstance() {
295                 return new ReplyBatchEvent();
296             }
297         };
298 
299     }
300 
301 }
302