1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import com.google.inject.Inject;
22 import org.apache.omid.metrics.MetricsRegistry;
23 import org.jboss.netty.channel.Channel;
24
25 import java.io.IOException;
26
27 public class RequestProcessorPersistCT extends AbstractRequestProcessor {
28
29 private final PersistenceProcessor persistenceProcessor;
30
31 @Inject
32 RequestProcessorPersistCT(MetricsRegistry metrics,
33 TimestampOracle timestampOracle,
34 PersistenceProcessor persistenceProcessor,
35 Panicker panicker,
36 TSOServerConfig config,
37 LowWatermarkWriter lowWatermarkWriter,
38 ReplyProcessor replyProcessor) throws IOException {
39
40 super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
41 this.persistenceProcessor = persistenceProcessor;
42 requestRing = disruptor.start();
43 }
44
45 @Override
46 public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
47 Optional<Long> lowWatermark) throws Exception {
48 persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx , lowWatermark);
49 }
50
51 @Override
52 public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
53 persistenceProcessor.addCommitRetryToBatch(startTimestamp,c,monCtx);
54 }
55
56 @Override
57 public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
58 persistenceProcessor.addAbortToBatch(startTimestamp,c,monCtx);
59 }
60
61 @Override
62 public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
63 persistenceProcessor.addTimestampToBatch(startTimestamp,c,monCtx);
64 }
65
66 @Override
67 public void onTimeout() throws Exception {
68 persistenceProcessor.triggerCurrentBatchFlush();
69 }
70 }