View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import com.google.inject.Inject;
22  import com.lmax.disruptor.BatchEventProcessor;
23  import com.lmax.disruptor.BusySpinWaitStrategy;
24  import com.lmax.disruptor.EventFactory;
25  import com.lmax.disruptor.EventHandler;
26  import com.lmax.disruptor.RingBuffer;
27  import com.lmax.disruptor.SequenceBarrier;
28  import org.apache.commons.pool2.ObjectPool;
29  import org.apache.omid.metrics.Meter;
30  import org.apache.omid.metrics.MetricsRegistry;
31  import org.apache.omid.proto.TSOProto;
32  import org.jboss.netty.channel.Channel;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import java.util.Comparator;
37  import java.util.PriorityQueue;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  import static com.codahale.metrics.MetricRegistry.name;
44  
45  class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
46  
47      private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
48  
49      private final ObjectPool<Batch> batchPool;
50  
51      private final RingBuffer<ReplyBatchEvent> replyRing;
52  
53      private AtomicLong nextIDToHandle = new AtomicLong();
54  
55      private PriorityQueue<ReplyBatchEvent> futureEvents;
56  
57      // Metrics
58      private final Meter abortMeter;
59      private final Meter commitMeter;
60      private final Meter timestampMeter;
61  
62      @Inject
63      ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
64  
65          this.batchPool = batchPool;
66  
67          this.nextIDToHandle.set(0);
68  
69          this.replyRing = RingBuffer.createMultiProducer(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
70  
71          SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
72          BatchEventProcessor<ReplyBatchEvent> replyProcessor = new BatchEventProcessor<>(replyRing, replySequenceBarrier, this);
73          replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
74  
75          replyRing.addGatingSequences(replyProcessor.getSequence());
76  
77          ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d").build();
78          ExecutorService replyExec = Executors.newSingleThreadExecutor(threadFactory);
79          replyExec.submit(replyProcessor);
80  
81          this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
82              public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
83                  return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
84              }
85          });
86  
87          this.abortMeter = metrics.meter(name("tso", "aborts"));
88          this.commitMeter = metrics.meter(name("tso", "commits"));
89          this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
90  
91      }
92  
93      private void handleReplyBatchEvent(ReplyBatchEvent event) throws Exception {
94  
95          String name;
96          Batch batch = event.getBatch();
97          for (int i = 0; batch != null && i < batch.getNumEvents(); ++i) {
98              PersistEvent localEvent = batch.get(i);
99  
100             switch (localEvent.getType()) {
101             case COMMIT:
102                 name = "commitReplyProcessor";
103                 localEvent.getMonCtx().timerStart(name);
104                 sendCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
105                 localEvent.getMonCtx().timerStop(name);
106                  break;
107             case ABORT:
108                 name = "abortReplyProcessor";
109                 localEvent.getMonCtx().timerStart(name);
110                 sendAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
111                 localEvent.getMonCtx().timerStop(name);
112                 break;
113             case TIMESTAMP:
114                 name = "timestampReplyProcessor";
115                 localEvent.getMonCtx().timerStart(name);
116                 sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
117                 localEvent.getMonCtx().timerStop(name);
118                 break;
119             default:
120                 LOG.error("Unknown event {}", localEvent.getType());
121                 break;
122             }
123             localEvent.getMonCtx().publish();
124         }
125 
126         if (batch != null) {
127             batchPool.returnObject(batch);
128         }
129 
130     }
131 
132     private void processWaitingEvents() throws Exception {
133 
134         while (!futureEvents.isEmpty() && futureEvents.peek().getBatchSequence() == nextIDToHandle.get()) {
135             ReplyBatchEvent e = futureEvents.poll();
136             handleReplyBatchEvent(e);
137             nextIDToHandle.incrementAndGet();
138         }
139 
140     }
141 
142     public void onEvent(ReplyBatchEvent event, long sequence, boolean endOfBatch) throws Exception {
143 
144         // Order of event's reply need to be guaranteed in order to preserve snapshot isolation.
145         // This is done in order to present a scenario where a start id of N is returned
146         // while commit smaller than still does not appear in the commit table.
147 
148         // If previous events were not processed yet (events contain smaller id)
149         if (event.getBatchSequence() > nextIDToHandle.get()) {
150             futureEvents.add(event);
151             return;
152         }
153 
154         handleReplyBatchEvent(event);
155 
156         nextIDToHandle.incrementAndGet();
157 
158         // Process events that arrived before and kept in futureEvents.
159         processWaitingEvents();
160 
161     }
162 
163     @Override
164     public void manageResponsesBatch(long batchSequence, Batch batch) {
165 
166         long seq = replyRing.next();
167         ReplyBatchEvent e = replyRing.get(seq);
168         ReplyBatchEvent.makeReplyBatch(e, batch, batchSequence);
169         replyRing.publish(seq);
170 
171     }
172 
173     @Override
174     public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
175 
176         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
177         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
178         commitBuilder.setAborted(false)
179                 .setStartTimestamp(startTimestamp)
180                 .setCommitTimestamp(commitTimestamp);
181         builder.setCommitResponse(commitBuilder.build());
182         c.write(builder.build());
183 
184         commitMeter.mark();
185 
186     }
187 
188     @Override
189     public void sendAbortResponse(long startTimestamp, Channel c) {
190 
191         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
192         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
193         commitBuilder.setAborted(true);
194         commitBuilder.setStartTimestamp(startTimestamp);
195         builder.setCommitResponse(commitBuilder.build());
196         c.write(builder.build());
197 
198         abortMeter.mark();
199 
200     }
201 
202     @Override
203     public void sendTimestampResponse(long startTimestamp, Channel c) {
204 
205         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
206         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
207         respBuilder.setStartTimestamp(startTimestamp);
208         builder.setTimestampResponse(respBuilder.build());
209         c.write(builder.build());
210 
211         timestampMeter.mark();
212 
213     }
214 
215     final static class ReplyBatchEvent {
216 
217         private Batch batch;
218         private long batchSequence;
219 
220         static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchSequence) {
221             e.batch = batch;
222             e.batchSequence = batchSequence;
223         }
224 
225         Batch getBatch() {
226             return batch;
227         }
228 
229         long getBatchSequence() {
230             return batchSequence;
231         }
232 
233         final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {
234             public ReplyBatchEvent newInstance() {
235                 return new ReplyBatchEvent();
236             }
237         };
238 
239     }
240 
241 }
242