View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import com.lmax.disruptor.EventFactory;
22  import com.lmax.disruptor.EventHandler;
23  import com.lmax.disruptor.RingBuffer;
24  import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
25  import com.lmax.disruptor.TimeoutHandler;
26  import com.lmax.disruptor.dsl.Disruptor;
27  import org.apache.omid.metrics.MetricsRegistry;
28  import org.apache.omid.tso.TSOStateManager.TSOState;
29  import org.jboss.netty.channel.Channel;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import javax.inject.Inject;
34  import java.io.IOException;
35  import java.util.Collection;
36  import java.util.Iterator;
37  import java.util.NoSuchElementException;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.ThreadFactory;
41  
42  import static com.lmax.disruptor.dsl.ProducerType.MULTI;
43  import static java.util.concurrent.TimeUnit.MILLISECONDS;
44  import static java.util.concurrent.TimeUnit.SECONDS;
45  import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
46  
47  class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
48  
49      private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
50  
51      // Disruptor-related attributes
52      private final ExecutorService disruptorExec;
53      private final Disruptor<RequestEvent> disruptor;
54      private final RingBuffer<RequestEvent> requestRing;
55  
56      private final TimestampOracle timestampOracle;
57      private final CommitHashMap hashmap;
58      private final MetricsRegistry metrics;
59      private final PersistenceProcessor persistProc;
60  
61      private long lowWatermark = -1L;
62  
63      @Inject
64      RequestProcessorImpl(MetricsRegistry metrics,
65                           TimestampOracle timestampOracle,
66                           PersistenceProcessor persistProc,
67                           Panicker panicker,
68                           TSOServerConfig config)
69              throws IOException {
70  
71          // ------------------------------------------------------------------------------------------------------------
72          // Disruptor initialization
73          // ------------------------------------------------------------------------------------------------------------
74  
75          TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
76  
77          ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
78          this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
79  
80          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
81          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
82          disruptor.handleEventsWith(this);
83          this.requestRing = disruptor.start();
84  
85          // ------------------------------------------------------------------------------------------------------------
86          // Attribute initialization
87          // ------------------------------------------------------------------------------------------------------------
88  
89          this.metrics = metrics;
90          this.persistProc = persistProc;
91          this.timestampOracle = timestampOracle;
92          this.hashmap = new CommitHashMap(config.getConflictMapSize());
93  
94          LOG.info("RequestProcessor initialized");
95  
96      }
97  
98      /**
99       * This should be called when the TSO gets leadership
100      */
101     @Override
102     public void update(TSOState state) throws Exception {
103         LOG.info("Initializing RequestProcessor state...");
104         this.lowWatermark = state.getLowWatermark();
105         persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
106         LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
107     }
108 
109     @Override
110     public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
111 
112         switch (event.getType()) {
113             case TIMESTAMP:
114                 handleTimestamp(event);
115                 break;
116             case COMMIT:
117                 handleCommit(event);
118                 break;
119             default:
120                 throw new IllegalStateException("Event not allowed in Request Processor: " + event);
121         }
122 
123     }
124 
125     @Override
126     public void onTimeout(long sequence) throws Exception {
127 
128         // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
129         // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
130         // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
131         // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
132         // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
133         // TODO (cont) in persistProc and it is guaranteed that access them serially.
134         persistProc.triggerCurrentBatchFlush();
135 
136     }
137 
138     @Override
139     public void timestampRequest(Channel c, MonitoringContext monCtx) {
140 
141         monCtx.timerStart("request.processor.timestamp.latency");
142         long seq = requestRing.next();
143         RequestEvent e = requestRing.get(seq);
144         RequestEvent.makeTimestampRequest(e, c, monCtx);
145         requestRing.publish(seq);
146 
147     }
148 
149     @Override
150     public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c,
151                               MonitoringContext monCtx) {
152 
153         monCtx.timerStart("request.processor.commit.latency");
154         long seq = requestRing.next();
155         RequestEvent e = requestRing.get(seq);
156         RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
157         requestRing.publish(seq);
158 
159     }
160 
161     private void handleTimestamp(RequestEvent requestEvent) throws Exception {
162 
163         long timestamp = timestampOracle.next();
164         requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
165         persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
166 
167     }
168 
169     private void handleCommit(RequestEvent event) throws Exception {
170 
171         long startTimestamp = event.getStartTimestamp();
172         Iterable<Long> writeSet = event.writeSet();
173         boolean isCommitRetry = event.isCommitRetry();
174         Channel c = event.getChannel();
175 
176         boolean txCanCommit;
177 
178         int numCellsInWriteset = 0;
179         // 0. check if it should abort
180         if (startTimestamp <= lowWatermark) {
181             txCanCommit = false;
182         } else {
183             // 1. check the write-write conflicts
184             txCanCommit = true;
185             for (long cellId : writeSet) {
186                 long value = hashmap.getLatestWriteForCell(cellId);
187                 if (value != 0 && value >= startTimestamp) {
188                     txCanCommit = false;
189                     break;
190                 }
191                 numCellsInWriteset++;
192             }
193         }
194 
195         if (txCanCommit) {
196             // 2. commit
197 
198             long commitTimestamp = timestampOracle.next();
199 
200             if (numCellsInWriteset > 0) {
201                 long newLowWatermark = lowWatermark;
202 
203                 for (long r : writeSet) {
204                     long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
205                     newLowWatermark = Math.max(removed, newLowWatermark);
206                 }
207 
208                 if (newLowWatermark != lowWatermark) {
209                     LOG.trace("Setting new low Watermark to {}", newLowWatermark);
210                     lowWatermark = newLowWatermark;
211                     persistProc.persistLowWatermark(newLowWatermark); // Async persist
212                 }
213             }
214             event.getMonCtx().timerStop("request.processor.commit.latency");
215             persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
216 
217         } else {
218 
219             event.getMonCtx().timerStop("request.processor.commit.latency");
220             if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
221                 persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
222             } else {
223                 persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
224             }
225 
226         }
227 
228     }
229 
230     @Override
231     public void close() throws IOException {
232 
233         LOG.info("Terminating Request Processor...");
234         disruptor.halt();
235         disruptor.shutdown();
236         LOG.info("\tRequest Processor Disruptor shutdown");
237         disruptorExec.shutdownNow();
238         try {
239             disruptorExec.awaitTermination(3, SECONDS);
240             LOG.info("\tRequest Processor Disruptor executor shutdown");
241         } catch (InterruptedException e) {
242             LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
243             Thread.currentThread().interrupt();
244         }
245         LOG.info("Request Processor terminated");
246 
247     }
248 
249     final static class RequestEvent implements Iterable<Long> {
250 
251         enum Type {
252             TIMESTAMP, COMMIT
253         }
254 
255         private Type type = null;
256         private Channel channel = null;
257 
258         private boolean isCommitRetry = false;
259         private long startTimestamp = 0;
260         private MonitoringContext monCtx;
261         private long numCells = 0;
262 
263         private static final int MAX_INLINE = 40;
264         private Long writeSet[] = new Long[MAX_INLINE];
265         private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
266 
267         static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
268             e.type = Type.TIMESTAMP;
269             e.channel = c;
270             e.monCtx = monCtx;
271         }
272 
273         static void makeCommitRequest(RequestEvent e,
274                                       long startTimestamp,
275                                       MonitoringContext monCtx,
276                                       Collection<Long> writeSet,
277                                       boolean isRetry,
278                                       Channel c) {
279             e.monCtx = monCtx;
280             e.type = Type.COMMIT;
281             e.channel = c;
282             e.startTimestamp = startTimestamp;
283             e.isCommitRetry = isRetry;
284             if (writeSet.size() > MAX_INLINE) {
285                 e.numCells = writeSet.size();
286                 e.writeSetAsCollection = writeSet;
287             } else {
288                 e.writeSetAsCollection = null;
289                 e.numCells = writeSet.size();
290                 int i = 0;
291                 for (Long cellId : writeSet) {
292                     e.writeSet[i] = cellId;
293                     i++;
294                 }
295             }
296 
297         }
298 
299         MonitoringContext getMonCtx() {
300             return monCtx;
301         }
302 
303         Type getType() {
304             return type;
305         }
306 
307         long getStartTimestamp() {
308             return startTimestamp;
309         }
310 
311         Channel getChannel() {
312             return channel;
313         }
314 
315         @Override
316         public Iterator<Long> iterator() {
317 
318             if (writeSetAsCollection != null) {
319                 return writeSetAsCollection.iterator();
320             }
321 
322             return new Iterator<Long>() {
323                 int i = 0;
324 
325                 @Override
326                 public boolean hasNext() {
327                     return i < numCells;
328                 }
329 
330                 @Override
331                 public Long next() {
332                     if (!hasNext()) {
333                         throw new NoSuchElementException();
334                     }
335                     return writeSet[i++];
336                 }
337 
338                 @Override
339                 public void remove() {
340                     throw new UnsupportedOperationException();
341                 }
342             };
343 
344         }
345 
346         Iterable<Long> writeSet() {
347 
348             return this;
349 
350         }
351 
352         boolean isCommitRetry() {
353             return isCommitRetry;
354         }
355 
356         final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
357             @Override
358             public RequestEvent newInstance() {
359                 return new RequestEvent();
360             }
361         };
362 
363     }
364 
365 }