View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.base.Objects;
22  import com.lmax.disruptor.WorkHandler;
23  import org.apache.omid.committable.CommitTable;
24  import org.apache.omid.metrics.Histogram;
25  import org.apache.omid.metrics.MetricsRegistry;
26  import org.apache.omid.metrics.Timer;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  import javax.inject.Inject;
31  import java.io.IOException;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import static com.codahale.metrics.MetricRegistry.name;
36  import static org.apache.omid.tso.PersistEvent.Type.COMMIT_RETRY;
37  
38  
39  public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
40  
41      private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorHandler.class);
42  
43      @VisibleForTesting
44      static final AtomicInteger consecutiveSequenceCreator = new AtomicInteger(0);
45  
46      private final String id;
47  
48      private final String tsoHostAndPort;
49      private final LeaseManagement leaseManager;
50  
51      private final ReplyProcessor replyProcessor;
52      private final RetryProcessor retryProcessor;
53      private final CommitTable.Writer writer;
54      final Panicker panicker;
55  
56      // Metrics in this component
57      private final Timer flushTimer;
58      private final Histogram batchSizeHistogram;
59      private final Histogram flushedCommitEventsHistogram;
60  
61      @Inject
62      PersistenceProcessorHandler(MetricsRegistry metrics,
63                                  String tsoHostAndPort, // TODO This should not be passed here. Should be part of panicker
64                                  LeaseManagement leaseManager,
65                                  CommitTable commitTable,
66                                  ReplyProcessor replyProcessor,
67                                  RetryProcessor retryProcessor,
68                                  Panicker panicker)
69      throws InterruptedException, ExecutionException, IOException {
70  
71          this.id = String.valueOf(consecutiveSequenceCreator.getAndIncrement());
72          this.tsoHostAndPort = tsoHostAndPort;
73          this.leaseManager = leaseManager;
74          this.writer = commitTable.getWriter();
75          this.replyProcessor = replyProcessor;
76          this.retryProcessor = retryProcessor;
77          this.panicker = panicker;
78  
79          // Metrics setup
80          String flushTimerName = name("tso", "persistence-processor-handler", id, "flush", "latency");
81          flushTimer = metrics.timer(flushTimerName);
82          String flushedCommitEventsName = name("tso", "persistence-processor-handler", id, "flushed", "commits", "size");
83          flushedCommitEventsHistogram = metrics.histogram(flushedCommitEventsName);
84          String batchSizeMetricsName = name("tso", "persistence-processor-handler", id, "batch", "size");
85          batchSizeHistogram = metrics.histogram(batchSizeMetricsName);
86  
87      }
88  
89      public String getId() {
90          return id;
91      }
92  
93      @Override
94      public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
95  
96          int commitEventsToFlush = 0;
97          Batch batch = batchEvent.getBatch();
98          int numOfBatchedEvents = batch.getNumEvents();
99          batchSizeHistogram.update(numOfBatchedEvents);
100         for (int i=0; i < numOfBatchedEvents; i++) {
101             PersistEvent event = batch.get(i);
102             switch (event.getType()) {
103                 case TIMESTAMP:
104                     event.getMonCtx().timerStop("persistence.processor.timestamp.latency");
105                     break;
106                 case COMMIT:
107                     writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
108                     commitEventsToFlush++;
109                     break;
110                 case COMMIT_RETRY:
111                     event.getMonCtx().timerStop("persistence.processor.commit-retry.latency");
112                     break;
113                 case ABORT:
114                     event.getMonCtx().timerStop("persistence.processor.abort.latency");
115                     break;
116                 case FENCE:
117                     // Persist the fence by using the fence identifier as both the start and commit timestamp.
118                     writer.addCommittedTransaction(event.getCommitTimestamp(), event.getCommitTimestamp());
119                     commitEventsToFlush++;
120                     break;
121                 default:
122                     throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
123             }
124         }
125 
126         // Flush and send the responses back to the client. WARNING: Before sending the responses, first we need
127         // to filter commit retries in the batch to disambiguate them.
128         flush(commitEventsToFlush);
129         filterAndDissambiguateClientRetries(batch);
130         for (int i=0; i < batch.getNumEvents(); i++) { // Just for statistics
131             PersistEvent event = batch.get(i);
132             switch (event.getType()) {
133                 case TIMESTAMP:
134                     event.getMonCtx().timerStart("reply.processor.timestamp.latency");
135                     break;
136                 case COMMIT:
137                     event.getMonCtx().timerStop("persistence.processor.commit.latency");
138                     event.getMonCtx().timerStart("reply.processor.commit.latency");
139                     break;
140                 case COMMIT_RETRY:
141                     throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
142                 case ABORT:
143                     event.getMonCtx().timerStart("reply.processor.abort.latency");
144                     break;
145                 case FENCE:
146                     event.getMonCtx().timerStop("persistence.processor.fence.latency");
147                     event.getMonCtx().timerStart("reply.processor.fence.latency");
148                     break;
149                 default:
150                     throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
151             }
152         }
153         replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
154 
155     }
156 
157     void flush(int commitEventsToFlush) {
158 
159         commitSuicideIfNotMaster();
160         try {
161             long startFlushTimeInNs = System.nanoTime();
162             if(commitEventsToFlush > 0) {
163                 writer.flush();
164             }
165             flushTimer.update(System.nanoTime() - startFlushTimeInNs);
166             flushedCommitEventsHistogram.update(commitEventsToFlush);
167         } catch (IOException e) {
168             panicker.panic("Error persisting commit batch", e);
169         }
170         commitSuicideIfNotMaster();
171 
172     }
173 
174     private void commitSuicideIfNotMaster() {
175         if (!leaseManager.stillInLeasePeriod()) {
176             panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
177         }
178     }
179 
180     void filterAndDissambiguateClientRetries(Batch batch) {
181 
182         int currentEventIdx = 0;
183         while (currentEventIdx <= batch.getLastEventIdx()) {
184             PersistEvent event = batch.get(currentEventIdx);
185             if (event.getType() == COMMIT_RETRY) {
186                 retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
187                 // Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
188                 swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
189                 batch.decreaseNumEvents();
190                 if (batch.isEmpty()) {
191                     break; // We're OK to call now the reply processor
192                 } else {
193                     continue; // Otherwise we continue checking for retries from the new event in the current position
194                 }
195             } else {
196                 currentEventIdx++; // Let's check if the next event was a retry
197             }
198         }
199 
200     }
201 
202     private void swapBatchElements(Batch batch, int firstIdx, int lastIdx) {
203         PersistEvent tmpEvent = batch.get(firstIdx);
204         PersistEvent lastEventInBatch = batch.get(lastIdx);
205         batch.set(firstIdx, lastEventInBatch);
206         batch.set(lastIdx, tmpEvent);
207     }
208 
209     @Override
210     public String toString() {
211 
212         return Objects.toStringHelper(this).add("id", id).toString();
213 
214     }
215 
216 }