View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import com.google.inject.name.Named;
23  import com.lmax.disruptor.EventFactory;
24  import com.lmax.disruptor.EventHandler;
25  import com.lmax.disruptor.RingBuffer;
26  import com.lmax.disruptor.WaitStrategy;
27  import com.lmax.disruptor.dsl.Disruptor;
28  
29  import org.apache.commons.pool2.ObjectPool;
30  import org.apache.omid.committable.CommitTable;
31  import org.apache.omid.committable.CommitTable.CommitTimestamp;
32  import org.apache.omid.metrics.Meter;
33  import org.apache.omid.metrics.MetricsRegistry;
34  import org.jboss.netty.channel.Channel;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import javax.inject.Inject;
39  
40  import java.io.IOException;
41  import java.util.concurrent.ExecutionException;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.ThreadFactory;
45  
46  import static com.codahale.metrics.MetricRegistry.name;
47  import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
48  import static java.util.concurrent.TimeUnit.SECONDS;
49  import static org.apache.omid.tso.RetryProcessorImpl.RetryEvent.EVENT_FACTORY;
50  
51  /**
52   * Manages the disambiguation of the retry requests that clients send when they did not received a response in the
53   * specified timeout. It replies directly to the client with the outcome identified.
54   */
55  class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
56  
57      private static final Logger LOG = LoggerFactory.getLogger(RetryProcessor.class);
58  
59      // Disruptor-related attributes
60      private final ExecutorService disruptorExec;
61      private final Disruptor<RetryEvent> disruptor;
62      private final RingBuffer<RetryEvent> retryRing;
63  
64      final ReplyProcessor replyProc;
65  
66      final CommitTable.Client commitTableClient;
67      final ObjectPool<Batch> batchPool;
68  
69      // Metrics
70      private final Meter txAlreadyCommittedMeter;
71      private final Meter invalidTxMeter;
72      private final Meter noCTFoundMeter;
73  
74      @Inject
75      RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy strategy,
76                         MetricsRegistry metrics,
77                         CommitTable commitTable,
78                         ReplyProcessor replyProc,
79                         Panicker panicker,
80                         ObjectPool<Batch> batchPool)
81              throws InterruptedException, ExecutionException, IOException {
82  
83          // ------------------------------------------------------------------------------------------------------------
84          // Disruptor initialization
85          // ------------------------------------------------------------------------------------------------------------
86  
87          ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
88          this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
89  
90          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, strategy);
91          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
92          disruptor.handleEventsWith(this);
93          this.retryRing = disruptor.start();
94  
95          // ------------------------------------------------------------------------------------------------------------
96          // Attribute initialization
97          // ------------------------------------------------------------------------------------------------------------
98  
99          this.commitTableClient = commitTable.getClient();
100         this.replyProc = replyProc;
101         this.batchPool = batchPool;
102 
103         // Metrics configuration
104         this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed"));
105         this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid"));
106         this.noCTFoundMeter = metrics.meter(name("tso", "retries", "aborts", "tx-without-commit-timestamp"));
107 
108         LOG.info("RetryProcessor initialized");
109 
110     }
111 
112     @Override
113     public void onEvent(final RetryEvent event, final long sequence, final boolean endOfBatch) throws Exception {
114 
115         switch (event.getType()) {
116             case COMMIT:
117                 handleCommitRetry(event);
118                 event.getMonCtx().timerStop("retry.processor.commit-retry.latency");
119                 break;
120             default:
121                 assert (false);
122                 break;
123         }
124         event.getMonCtx().publish();
125 
126     }
127 
128     private void handleCommitRetry(RetryEvent event) {
129 
130         long startTimestamp = event.getStartTimestamp();
131         try {
132             Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
133             if (commitTimestamp.isPresent()) {
134                 if (commitTimestamp.get().isValid()) {
135                     LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
136                     replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx(), Optional.<Long>absent());
137                     txAlreadyCommittedMeter.mark();
138                 } else {
139                     LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
140                     replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
141                     invalidTxMeter.mark();
142                 }
143             } else {
144                 LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp);
145                 replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
146                 noCTFoundMeter.mark();
147             }
148         } catch (InterruptedException e) {
149             LOG.error("Interrupted reading from commit table");
150             Thread.currentThread().interrupt();
151         } catch (ExecutionException e) {
152             LOG.error("Error reading from commit table", e);
153         }
154 
155     }
156 
157     @Override
158     public void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx) {
159         long seq = retryRing.next();
160         RetryEvent e = retryRing.get(seq);
161         monCtx.timerStart("retry.processor.commit-retry.latency");
162         RetryEvent.makeCommitRetry(e, startTimestamp, c, monCtx);
163         retryRing.publish(seq);
164     }
165 
166     @Override
167     public void close() throws IOException {
168 
169         LOG.info("Terminating Retry Processor...");
170         disruptor.halt();
171         disruptor.shutdown();
172         LOG.info("\tRetry Processor Disruptor shutdown");
173         disruptorExec.shutdownNow();
174         try {
175             disruptorExec.awaitTermination(3, SECONDS);
176             LOG.info("\tRetry Processor Disruptor executor shutdown");
177         } catch (InterruptedException e) {
178             LOG.error("Interrupted whilst finishing Retry Processor Disruptor executor");
179             Thread.currentThread().interrupt();
180         }
181         LOG.info("Retry Processor terminated");
182 
183     }
184 
185     public final static class RetryEvent {
186 
187         enum Type {
188             COMMIT
189         }
190 
191         private Type type = null;
192 
193         private long startTimestamp = 0;
194         private Channel channel = null;
195         private MonitoringContext monCtx;
196 
197         static void makeCommitRetry(RetryEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
198             e.monCtx = monCtx;
199             e.type = Type.COMMIT;
200             e.startTimestamp = startTimestamp;
201             e.channel = c;
202         }
203 
204         MonitoringContext getMonCtx() {
205             return monCtx;
206         }
207 
208         Type getType() {
209             return type;
210         }
211 
212         Channel getChannel() {
213             return channel;
214         }
215 
216         long getStartTimestamp() {
217             return startTimestamp;
218         }
219 
220         public final static EventFactory<RetryEvent> EVENT_FACTORY = new EventFactory<RetryEvent>() {
221             @Override
222             public RetryEvent newInstance() {
223                 return new RetryEvent();
224             }
225         };
226 
227     }
228 
229 }