1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import com.google.inject.Inject;
22 import org.apache.omid.metrics.MetricsRegistry;
23 import org.jboss.netty.channel.Channel;
24
25 import java.io.IOException;
26
27 public class RequestProcessorSkipCT extends AbstractRequestProcessor {
28
29
30 private final ReplyProcessor replyProcessor;
31
32 private final LeaseManagement leaseManager;
33 private final Panicker panicker;
34 private final String tsoHostAndPort;
35
36 @Inject
37 RequestProcessorSkipCT(MetricsRegistry metrics,
38 TimestampOracle timestampOracle,
39 ReplyProcessor replyProcessor,
40 Panicker panicker,
41 LeaseManagement leaseManager,
42 TSOServerConfig config,
43 LowWatermarkWriter lowWatermarkWriter,
44 String tsoHostAndPort) throws IOException {
45 super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
46 this.replyProcessor = replyProcessor;
47 this.tsoHostAndPort = tsoHostAndPort;
48 requestRing = disruptor.start();
49 this.leaseManager = leaseManager;
50 this.panicker = panicker;
51 }
52
53 private void commitSuicideIfNotMaster() {
54 if (!leaseManager.stillInLeasePeriod()) {
55 panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
56 }
57 }
58
59 @Override
60 public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> newLowWatermark) {
61 commitSuicideIfNotMaster();
62 monCtx.timerStart("reply.processor.commit.latency");
63 replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
64 }
65
66 @Override
67 public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
68 monCtx.timerStart("reply.processor.abort.latency");
69 replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
70 }
71
72 @Override
73 public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
74 monCtx.timerStart("reply.processor.abort.latency");
75 replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
76 }
77
78 @Override
79 public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
80 monCtx.timerStart("reply.processor.timestamp.latency");
81 replyProcessor.sendTimestampResponse(startTimestamp, c, monCtx);
82 }
83
84 @Override
85 public void onTimeout() {
86
87 }
88 }