View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.base.Optional;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import com.lmax.disruptor.BatchEventProcessor;
23  import com.lmax.disruptor.EventFactory;
24  import com.lmax.disruptor.EventHandler;
25  import com.lmax.disruptor.RingBuffer;
26  import com.lmax.disruptor.SequenceBarrier;
27  import com.lmax.disruptor.YieldingWaitStrategy;
28  import org.apache.commons.pool2.ObjectPool;
29  import org.apache.omid.committable.CommitTable;
30  import org.apache.omid.committable.CommitTable.CommitTimestamp;
31  import org.apache.omid.metrics.Meter;
32  import org.apache.omid.metrics.MetricsRegistry;
33  
34  import org.jboss.netty.channel.Channel;
35  
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import javax.inject.Inject;
40  
41  import java.io.IOException;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Executors;
45  import java.util.concurrent.ThreadFactory;
46  
47  import static com.codahale.metrics.MetricRegistry.name;
48  
49  /**
50   * Manages the disambiguation of the retry requests that clients send when they did not received a response in the
51   * specified timeout. It replies directly to the client with the outcome identified.
52   */
53  class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(RetryProcessor.class);
56  
57      // Disruptor chain stuff
58      final ReplyProcessor replyProc;
59      final RingBuffer<RetryEvent> retryRing;
60  
61      final CommitTable.Client commitTableClient;
62      final ObjectPool<Batch> batchPool;
63  
64      // Metrics
65      final Meter retriesMeter;
66  
67      @Inject
68      RetryProcessorImpl(MetricsRegistry metrics,
69                         CommitTable commitTable,
70                         ReplyProcessor replyProc,
71                         Panicker panicker,
72                         ObjectPool<Batch> batchPool)
73              throws InterruptedException, ExecutionException, IOException {
74  
75          this.commitTableClient = commitTable.getClient();
76          this.replyProc = replyProc;
77          this.batchPool = batchPool;
78  
79          retryRing = RingBuffer.createSingleProducer(RetryEvent.EVENT_FACTORY, 1 << 12, new YieldingWaitStrategy());
80          SequenceBarrier retrySequenceBarrier = retryRing.newBarrier();
81          BatchEventProcessor<RetryEvent> retryProcessor = new BatchEventProcessor<>(retryRing, retrySequenceBarrier, this);
82          retryProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
83          retryRing.addGatingSequences(retryProcessor.getSequence());
84  
85          ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
86          ExecutorService retryExec = Executors.newSingleThreadExecutor(threadFactory);
87          retryExec.submit(retryProcessor);
88  
89          // Metrics configuration
90          retriesMeter = metrics.meter(name("tso", "retries"));
91  
92      }
93  
94      @Override
95      public void onEvent(final RetryEvent event, final long sequence, final boolean endOfBatch)
96              throws Exception {
97  
98          switch (event.getType()) {
99              case COMMIT:
100                 // TODO: What happens when the IOException is thrown?
101                 handleCommitRetry(event);
102                 break;
103             default:
104                 assert (false);
105                 break;
106         }
107 
108     }
109 
110     private void handleCommitRetry(RetryEvent event) throws Exception {
111 
112         long startTimestamp = event.getStartTimestamp();
113         try {
114             Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
115             if(commitTimestamp.isPresent()) {
116                 if (commitTimestamp.get().isValid()) {
117                     LOG.trace("Valid commit TS found in Commit Table. Replying Commit to the client...");
118                     replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
119                 } else {
120                     LOG.trace("Invalid commit TS found in Commit Table. Replying Abort to the client...");
121                     replyProc.sendAbortResponse(startTimestamp, event.getChannel());
122                 }
123             } else {
124                 LOG.trace("No commit TS found in Commit Table. Replying Abort to the client..");
125                 replyProc.sendAbortResponse(startTimestamp, event.getChannel());
126             }
127         } catch (InterruptedException e) {
128             LOG.error("Interrupted reading from commit table");
129             Thread.currentThread().interrupt();
130         } catch (ExecutionException e) {
131             LOG.error("Error reading from commit table", e);
132         }
133 
134         retriesMeter.mark();
135     }
136 
137     @Override
138     public void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx) {
139         long seq = retryRing.next();
140         RetryEvent e = retryRing.get(seq);
141         RetryEvent.makeCommitRetry(e, startTimestamp, c, monCtx);
142         retryRing.publish(seq);
143     }
144 
145     public final static class RetryEvent {
146 
147         enum Type {
148             COMMIT
149         }
150 
151         private Type type = null;
152 
153         private long startTimestamp = 0;
154         private Channel channel = null;
155         private MonitoringContext monCtx;
156 
157         static void makeCommitRetry(RetryEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
158             e.monCtx = monCtx;
159             e.type = Type.COMMIT;
160             e.startTimestamp = startTimestamp;
161             e.channel = c;
162         }
163 
164         MonitoringContext getMonCtx() {
165             return monCtx;
166         }
167 
168         Type getType() {
169             return type;
170         }
171 
172         Channel getChannel() {
173             return channel;
174         }
175 
176         long getStartTimestamp() {
177             return startTimestamp;
178         }
179 
180         public final static EventFactory<RetryEvent> EVENT_FACTORY = new EventFactory<RetryEvent>() {
181             @Override
182             public RetryEvent newInstance() {
183                 return new RetryEvent();
184             }
185         };
186 
187     }
188 
189 }