View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import com.lmax.disruptor.BatchEventProcessor;
22  import com.lmax.disruptor.EventFactory;
23  import com.lmax.disruptor.EventHandler;
24  import com.lmax.disruptor.RingBuffer;
25  import com.lmax.disruptor.SequenceBarrier;
26  import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
27  import com.lmax.disruptor.TimeoutHandler;
28  import org.apache.omid.metrics.MetricsRegistry;
29  import org.apache.omid.tso.TSOStateManager.TSOState;
30  import org.jboss.netty.channel.Channel;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import javax.inject.Inject;
35  import java.io.IOException;
36  import java.util.Collection;
37  import java.util.Iterator;
38  import java.util.NoSuchElementException;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.TimeUnit;
43  
44  class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
45  
46      private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
47  
48      private final TimestampOracle timestampOracle;
49      private final CommitHashMap hashmap;
50      private final MetricsRegistry metrics;
51      private final PersistenceProcessor persistProc;
52      private final RingBuffer<RequestEvent> requestRing;
53      private long lowWatermark = -1L;
54  
55      @Inject
56      RequestProcessorImpl(MetricsRegistry metrics,
57                           TimestampOracle timestampOracle,
58                           PersistenceProcessor persistProc,
59                           Panicker panicker,
60                           TSOServerConfig config)
61              throws IOException {
62  
63          this.metrics = metrics;
64  
65          this.persistProc = persistProc;
66          this.timestampOracle = timestampOracle;
67  
68          this.hashmap = new CommitHashMap(config.getMaxItems());
69  
70          final TimeoutBlockingWaitStrategy timeoutStrategy
71                  = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
72  
73          // Set up the disruptor thread
74          requestRing = RingBuffer.createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12, timeoutStrategy);
75          SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
76          BatchEventProcessor<RequestEvent> requestProcessor =
77                  new BatchEventProcessor<>(requestRing, requestSequenceBarrier, this);
78          requestRing.addGatingSequences(requestProcessor.getSequence());
79          requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
80  
81          ExecutorService requestExec = Executors.newSingleThreadExecutor(
82                  new ThreadFactoryBuilder().setNameFormat("request-%d").build());
83          // Each processor runs on a separate thread
84          requestExec.submit(requestProcessor);
85  
86      }
87  
88      /**
89       * This should be called when the TSO gets leadership
90       */
91      @Override
92      public void update(TSOState state) throws Exception {
93          LOG.info("Initializing RequestProcessor...");
94          this.lowWatermark = state.getLowWatermark();
95          persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
96          LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
97      }
98  
99      @Override
100     public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
101 
102         String name = null;
103         try { // TODO this should be a switch. Re-check why it's NOT now
104             if (event.getType() == RequestEvent.Type.TIMESTAMP) {
105                 name = "timestampReqProcessor";
106                 event.getMonCtx().timerStart(name);
107                 handleTimestamp(event);
108             } else if (event.getType() == RequestEvent.Type.COMMIT) {
109                 name = "commitReqProcessor";
110                 event.getMonCtx().timerStart(name);
111                 handleCommit(event);
112             }
113         } finally {
114             if (null != name) {
115                 event.getMonCtx().timerStop(name);
116             }
117         }
118 
119     }
120 
121     @Override
122     public void onTimeout(long sequence) throws Exception {
123 
124         // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
125         // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
126         // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
127         // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
128         // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
129         // TODO (cont) in persistProc and it is guaranteed that access them serially.
130         persistProc.triggerCurrentBatchFlush();
131 
132     }
133 
134     @Override
135     public void timestampRequest(Channel c, MonitoringContext monCtx) {
136 
137         long seq = requestRing.next();
138         RequestEvent e = requestRing.get(seq);
139         RequestEvent.makeTimestampRequest(e, c, monCtx);
140         requestRing.publish(seq);
141 
142     }
143 
144     @Override
145     public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c,
146                               MonitoringContext monCtx) {
147 
148         long seq = requestRing.next();
149         RequestEvent e = requestRing.get(seq);
150         RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
151         requestRing.publish(seq);
152 
153     }
154 
155     private void handleTimestamp(RequestEvent requestEvent) throws Exception {
156 
157         long timestamp;
158 
159         try {
160             timestamp = timestampOracle.next();
161         } catch (IOException e) {
162             LOG.error("Error getting timestamp", e);
163             return;
164         }
165 
166         persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
167 
168     }
169 
170     private long handleCommit(RequestEvent event) throws Exception {
171 
172         long startTimestamp = event.getStartTimestamp();
173         Iterable<Long> writeSet = event.writeSet();
174         boolean isRetry = event.isRetry();
175         Channel c = event.getChannel();
176 
177         boolean committed;
178         long commitTimestamp = 0L;
179 
180         int numCellsInWriteset = 0;
181         // 0. check if it should abort
182         if (startTimestamp <= lowWatermark) {
183             committed = false;
184         } else {
185             // 1. check the write-write conflicts
186             committed = true;
187             for (long cellId : writeSet) {
188                 long value = hashmap.getLatestWriteForCell(cellId);
189                 if (value != 0 && value >= startTimestamp) {
190                     committed = false;
191                     break;
192                 }
193                 numCellsInWriteset++;
194             }
195         }
196 
197         if (committed) {
198             // 2. commit
199             try {
200                 commitTimestamp = timestampOracle.next();
201 
202                 if (numCellsInWriteset > 0) {
203                     long newLowWatermark = lowWatermark;
204 
205                     for (long r : writeSet) {
206                         long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
207                         newLowWatermark = Math.max(removed, newLowWatermark);
208                     }
209 
210                     if (newLowWatermark != lowWatermark) {
211                         LOG.trace("Setting new low Watermark to {}", newLowWatermark);
212                         lowWatermark = newLowWatermark;
213                         persistProc.persistLowWatermark(newLowWatermark); // Async persist
214                     }
215                 }
216                 persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
217             } catch (IOException e) {
218                 LOG.error("Error committing", e);
219             }
220         } else { // add it to the aborted list
221             persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
222         }
223 
224         return commitTimestamp;
225 
226     }
227 
228     final static class RequestEvent implements Iterable<Long> {
229 
230         enum Type {
231             TIMESTAMP, COMMIT
232         }
233 
234         private Type type = null;
235         private Channel channel = null;
236 
237         private boolean isRetry = false;
238         private long startTimestamp = 0;
239         private MonitoringContext monCtx;
240         private long numCells = 0;
241 
242         private static final int MAX_INLINE = 40;
243         private Long writeSet[] = new Long[MAX_INLINE];
244         private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
245 
246         static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
247             e.type = Type.TIMESTAMP;
248             e.channel = c;
249             e.monCtx = monCtx;
250         }
251 
252         static void makeCommitRequest(RequestEvent e,
253                                       long startTimestamp,
254                                       MonitoringContext monCtx,
255                                       Collection<Long> writeSet,
256                                       boolean isRetry,
257                                       Channel c) {
258             e.monCtx = monCtx;
259             e.type = Type.COMMIT;
260             e.channel = c;
261             e.startTimestamp = startTimestamp;
262             e.isRetry = isRetry;
263             if (writeSet.size() > MAX_INLINE) {
264                 e.numCells = writeSet.size();
265                 e.writeSetAsCollection = writeSet;
266             } else {
267                 e.writeSetAsCollection = null;
268                 e.numCells = writeSet.size();
269                 int i = 0;
270                 for (Long cellId : writeSet) {
271                     e.writeSet[i] = cellId;
272                     i++;
273                 }
274             }
275 
276         }
277 
278         MonitoringContext getMonCtx() {
279             return monCtx;
280         }
281 
282         Type getType() {
283             return type;
284         }
285 
286         long getStartTimestamp() {
287             return startTimestamp;
288         }
289 
290         Channel getChannel() {
291             return channel;
292         }
293 
294         @Override
295         public Iterator<Long> iterator() {
296 
297             if (writeSetAsCollection != null) {
298                 return writeSetAsCollection.iterator();
299             }
300 
301             return new Iterator<Long>() {
302                 int i = 0;
303 
304                 @Override
305                 public boolean hasNext() {
306                     return i < numCells;
307                 }
308 
309                 @Override
310                 public Long next() {
311                     if (!hasNext()) {
312                         throw new NoSuchElementException();
313                     }
314                     return writeSet[i++];
315                 }
316 
317                 @Override
318                 public void remove() {
319                     throw new UnsupportedOperationException();
320                 }
321             };
322 
323         }
324 
325         Iterable<Long> writeSet() {
326 
327             return this;
328 
329         }
330 
331         boolean isRetry() {
332             return isRetry;
333         }
334 
335         final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
336             @Override
337             public RequestEvent newInstance() {
338                 return new RequestEvent();
339             }
340         };
341 
342     }
343 
344 }