1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21  import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
22  import com.lmax.disruptor.WorkHandler;
23  import org.apache.omid.committable.CommitTable;
24  import org.apache.omid.metrics.Histogram;
25  import org.apache.omid.metrics.MetricsRegistry;
26  import org.apache.omid.metrics.Timer;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  import javax.inject.Inject;
31  import java.io.IOException;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import static com.codahale.metrics.MetricRegistry.name;
36  import static org.apache.omid.tso.PersistEvent.Type.COMMIT_RETRY;
37  
38  
39  public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
40  
41      private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorHandler.class);
42  
43      @VisibleForTesting
44      static final AtomicInteger consecutiveSequenceCreator = new AtomicInteger(0);
45  
46      private final String id;
47  
48      private final String tsoHostAndPort;
49      private final LeaseManagement leaseManager;
50  
51      private final ReplyProcessor replyProcessor;
52      private final RetryProcessor retryProcessor;
53      private final CommitTable.Writer writer;
54      final Panicker panicker;
55  
56      
57      private final Timer flushTimer;
58      private final Histogram batchSizeHistogram;
59      private final Histogram flushedCommitEventsHistogram;
60  
61      @Inject
62      PersistenceProcessorHandler(MetricsRegistry metrics,
63                                  String tsoHostAndPort, 
64                                  LeaseManagement leaseManager,
65                                  CommitTable commitTable,
66                                  ReplyProcessor replyProcessor,
67                                  RetryProcessor retryProcessor,
68                                  Panicker panicker)
69      throws InterruptedException, ExecutionException, IOException {
70  
71          this.id = String.valueOf(consecutiveSequenceCreator.getAndIncrement());
72          this.tsoHostAndPort = tsoHostAndPort;
73          this.leaseManager = leaseManager;
74          this.writer = commitTable.getWriter();
75          this.replyProcessor = replyProcessor;
76          this.retryProcessor = retryProcessor;
77          this.panicker = panicker;
78  
79          
80          String flushTimerName = name("tso", "persistence-processor-handler", id, "flush", "latency");
81          flushTimer = metrics.timer(flushTimerName);
82          String flushedCommitEventsName = name("tso", "persistence-processor-handler", id, "flushed", "commits", "size");
83          flushedCommitEventsHistogram = metrics.histogram(flushedCommitEventsName);
84          String batchSizeMetricsName = name("tso", "persistence-processor-handler", id, "batch", "size");
85          batchSizeHistogram = metrics.histogram(batchSizeMetricsName);
86  
87      }
88  
89      public String getId() {
90          return id;
91      }
92  
93      @Override
94      public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
95  
96          int commitEventsToFlush = 0;
97          Batch batch = batchEvent.getBatch();
98          int numOfBatchedEvents = batch.getNumEvents();
99          batchSizeHistogram.update(numOfBatchedEvents);
100         for (int i=0; i < numOfBatchedEvents; i++) {
101             PersistEvent event = batch.get(i);
102             switch (event.getType()) {
103                 case TIMESTAMP:
104                     event.getMonCtx().timerStop("persistence.processor.timestamp.latency");
105                     break;
106                 case COMMIT:
107                     writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
108                     commitEventsToFlush++;
109                     break;
110                 case COMMIT_RETRY:
111                     event.getMonCtx().timerStop("persistence.processor.commit-retry.latency");
112                     break;
113                 case ABORT:
114                     event.getMonCtx().timerStop("persistence.processor.abort.latency");
115                     break;
116                 case FENCE:
117                     
118                     writer.addCommittedTransaction(event.getCommitTimestamp(), event.getCommitTimestamp());
119                     commitEventsToFlush++;
120                     break;
121                 default:
122                     throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
123             }
124         }
125 
126         
127         
128         flush(commitEventsToFlush);
129         filterAndDissambiguateClientRetries(batch);
130         for (int i=0; i < batch.getNumEvents(); i++) { 
131             PersistEvent event = batch.get(i);
132             switch (event.getType()) {
133                 case TIMESTAMP:
134                     event.getMonCtx().timerStart("reply.processor.timestamp.latency");
135                     break;
136                 case COMMIT:
137                     event.getMonCtx().timerStop("persistence.processor.commit.latency");
138                     event.getMonCtx().timerStart("reply.processor.commit.latency");
139                     break;
140                 case COMMIT_RETRY:
141                     throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
142                 case ABORT:
143                     event.getMonCtx().timerStart("reply.processor.abort.latency");
144                     break;
145                 case FENCE:
146                     event.getMonCtx().timerStop("persistence.processor.fence.latency");
147                     event.getMonCtx().timerStart("reply.processor.fence.latency");
148                     break;
149                 default:
150                     throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
151             }
152         }
153         replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
154 
155     }
156 
157     void flush(int commitEventsToFlush) {
158 
159         commitSuicideIfNotMaster();
160         try {
161             long startFlushTimeInNs = System.nanoTime();
162             if(commitEventsToFlush > 0) {
163                 writer.flush();
164             }
165             flushTimer.update(System.nanoTime() - startFlushTimeInNs);
166             flushedCommitEventsHistogram.update(commitEventsToFlush);
167         } catch (IOException e) {
168             panicker.panic("Error persisting commit batch", e);
169         }
170         commitSuicideIfNotMaster();
171 
172     }
173 
174     private void commitSuicideIfNotMaster() {
175         if (!leaseManager.stillInLeasePeriod()) {
176             panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
177         }
178     }
179 
180     void filterAndDissambiguateClientRetries(Batch batch) {
181 
182         int currentEventIdx = 0;
183         while (currentEventIdx <= batch.getLastEventIdx()) {
184             PersistEvent event = batch.get(currentEventIdx);
185             if (event.getType() == COMMIT_RETRY) {
186                 retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
187                 
188                 swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
189                 batch.decreaseNumEvents();
190                 if (batch.isEmpty()) {
191                     break; 
192                 } else {
193                     continue; 
194                 }
195             } else {
196                 currentEventIdx++; 
197             }
198         }
199 
200     }
201 
202     private void swapBatchElements(Batch batch, int firstIdx, int lastIdx) {
203         PersistEvent tmpEvent = batch.get(firstIdx);
204         PersistEvent lastEventInBatch = batch.get(lastIdx);
205         batch.set(firstIdx, lastEventInBatch);
206         batch.set(lastIdx, tmpEvent);
207     }
208 
209     @Override
210     public String toString() {
211 
212         return MoreObjects.toStringHelper(this).add("id", id).toString();
213 
214     }
215 
216 }