View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.lmax.disruptor.WorkHandler;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.metrics.Histogram;
23  import org.apache.omid.metrics.MetricsRegistry;
24  import org.apache.omid.metrics.Timer;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import javax.inject.Inject;
29  import java.io.IOException;
30  import java.util.concurrent.ExecutionException;
31  
32  import static com.codahale.metrics.MetricRegistry.name;
33  
34  public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
35  
36      private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorHandler.class);
37  
38      private final String tsoHostAndPort;
39      private final LeaseManagement leaseManager;
40  
41      private final ReplyProcessor replyProcessor;
42      private final RetryProcessor retryProc;
43      private final CommitTable.Writer writer;
44      final Panicker panicker;
45  
46      private final Timer flushTimer;
47      private final Histogram batchSizeHistogram;
48  
49      @Inject
50      PersistenceProcessorHandler(MetricsRegistry metrics,
51                                  String tsoHostAndPort,
52                                  LeaseManagement leaseManager,
53                                  CommitTable commitTable,
54                                  ReplyProcessor replyProcessor,
55                                  RetryProcessor retryProc,
56                                  Panicker panicker)
57      throws InterruptedException, ExecutionException, IOException {
58  
59          this.tsoHostAndPort = tsoHostAndPort;
60          this.leaseManager = leaseManager;
61          this.writer = commitTable.getWriter();
62          this.replyProcessor = replyProcessor;
63          this.retryProc = retryProc;
64          this.panicker = panicker;
65  
66          flushTimer = metrics.timer(name("tso", "persist", "flush"));
67          batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
68  
69      }
70  
71      @Override
72      public void onEvent(PersistenceProcessorImpl.PersistBatchEvent event) throws Exception {
73  
74          Batch batch = event.getBatch();
75          for (int i=0; i < batch.getNumEvents(); ++i) {
76              PersistEvent localEvent = batch.get(i);
77  
78              switch (localEvent.getType()) {
79              case COMMIT:
80                  localEvent.getMonCtx().timerStart("commitPersistProcessor");
81                  // TODO: What happens when the IOException is thrown?
82                  writer.addCommittedTransaction(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp());
83                  break;
84              case ABORT:
85                  break;
86              case TIMESTAMP:
87                  localEvent.getMonCtx().timerStart("timestampPersistProcessor");
88                  break;
89              default:
90                  throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
91              }
92          }
93          if (batch.getNumEvents() > 0) {
94              flush(batch.getNumEvents());
95              sendReplies(batch, event.getBatchSequence());
96          }
97      }
98  
99      private void flush(int numBatchedEvents) {
100 
101             commitSuicideIfNotMaster();
102             try {
103                 long startFlushTimeInNs = System.nanoTime();
104                 writer.flush();
105                 flushTimer.update(System.nanoTime() - startFlushTimeInNs);
106                 batchSizeHistogram.update(numBatchedEvents);
107             } catch (IOException e) {
108                 panicker.panic("Error persisting commit batch", e);
109             }
110             commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
111 
112     }
113 
114     private void commitSuicideIfNotMaster() {
115         if (!leaseManager.stillInLeasePeriod()) {
116             panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
117         }
118     }
119 
120     private void sendReplies(Batch batch, long batchSequence) {
121 
122         int i = 0;
123         while (i < batch.getNumEvents()) {
124             PersistEvent e = batch.get(i);
125             if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) {
126                 retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
127                 PersistEvent tmp = batch.get(i);
128                 //TODO: why assign it?
129                 batch.set(i, batch.get(batch.getNumEvents() - 1));
130                 batch.set(batch.getNumEvents()  - 1, tmp);
131                 if (batch.getNumEvents()  == 1) {
132                     batch.clear();
133                     replyProcessor.manageResponsesBatch(batchSequence, null);
134                     return;
135                 }
136                 batch.decreaseNumEvents();
137                 continue;
138             }
139             i++;
140         }
141 
142         replyProcessor.manageResponsesBatch(batchSequence, batch);
143 
144     }
145 
146 }