1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
22 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
23 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import com.google.inject.name.Named;
25 import com.lmax.disruptor.EventFactory;
26 import com.lmax.disruptor.RingBuffer;
27 import com.lmax.disruptor.WaitStrategy;
28 import com.lmax.disruptor.dsl.Disruptor;
29
30 import org.apache.commons.pool2.ObjectPool;
31 import org.apache.omid.committable.CommitTable;
32 import org.apache.omid.metrics.MetricsRegistry;
33 import org.jboss.netty.channel.Channel;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import javax.inject.Inject;
38
39 import java.io.IOException;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42
43 import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
44 import static java.util.concurrent.TimeUnit.SECONDS;
45 import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
46 import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
47
48 class PersistenceProcessorImpl implements PersistenceProcessor {
49
50 private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
51
52
53 private final ExecutorService disruptorExec;
54 private final Disruptor<PersistBatchEvent> disruptor;
55 private final RingBuffer<PersistBatchEvent> persistRing;
56
57 private final ObjectPool<Batch> batchPool;
58 @VisibleForTesting
59 Batch currentBatch;
60
61
62 volatile private long batchSequence;
63 private MetricsRegistry metrics;
64
65 @Inject
66 PersistenceProcessorImpl(TSOServerConfig config,
67 @Named("PersistenceStrategy") WaitStrategy strategy,
68 CommitTable commitTable,
69 ObjectPool<Batch> batchPool,
70 Panicker panicker,
71 PersistenceProcessorHandler[] handlers,
72 MetricsRegistry metrics)
73 throws Exception {
74
75
76
77
78
79 ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
80 this.disruptorExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
81
82 this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, strategy);
83 disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
84 disruptor.handleEventsWithWorkerPool(handlers);
85 this.persistRing = disruptor.start();
86
87
88
89
90
91 this.metrics = metrics;
92 this.batchSequence = 0L;
93 this.batchPool = batchPool;
94 this.currentBatch = batchPool.borrowObject();
95
96 LOG.info("PersistentProcessor initialized");
97 }
98
99 @Override
100 public void triggerCurrentBatchFlush() throws Exception {
101
102 if (currentBatch.isEmpty()) {
103 return;
104 }
105 long seq = persistRing.next();
106 PersistBatchEvent e = persistRing.get(seq);
107 makePersistBatch(e, batchSequence++, currentBatch);
108 persistRing.publish(seq);
109 currentBatch = batchPool.borrowObject();
110
111 }
112
113 @Override
114 public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
115 Optional<Long> newLowWatermark)
116 throws Exception {
117
118 currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
119 if (currentBatch.isFull()) {
120 triggerCurrentBatchFlush();
121 }
122
123 }
124
125 @Override
126 public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
127 currentBatch.addCommitRetry(startTimestamp, c, monCtx);
128 if (currentBatch.isFull()) {
129 triggerCurrentBatchFlush();
130 }
131 }
132
133 @Override
134 public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx)
135 throws Exception {
136
137 currentBatch.addAbort(startTimestamp, c, monCtx);
138 if (currentBatch.isFull()) {
139 triggerCurrentBatchFlush();
140 }
141
142 }
143
144 @Override
145 public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
146
147 currentBatch.addTimestamp(startTimestamp, c, monCtx);
148 if (currentBatch.isFull()) {
149 triggerCurrentBatchFlush();
150 }
151
152 }
153
154 @Override
155 public void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
156
157 currentBatch.addFence(tableID, fenceTimestamp, c, monCtx);
158 if (currentBatch.isFull()) {
159 triggerCurrentBatchFlush();
160 }
161
162 }
163
164 @Override
165 public void close() throws IOException {
166
167 LOG.info("Terminating Persistence Processor...");
168 disruptor.halt();
169 disruptor.shutdown();
170 LOG.info("\tPersistence Processor Disruptor shutdown");
171 disruptorExec.shutdownNow();
172 try {
173 disruptorExec.awaitTermination(3, SECONDS);
174 LOG.info("\tPersistence Processor Disruptor executor shutdown");
175 } catch (InterruptedException e) {
176 LOG.error("Interrupted whilst finishing Persistence Processor Disruptor executor");
177 Thread.currentThread().interrupt();
178 }
179 LOG.info("Persistence Processor terminated");
180
181 }
182
183 final static class PersistBatchEvent {
184
185 private long batchSequence;
186 private Batch batch;
187
188 static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
189 e.batch = batch;
190 e.batchSequence = batchSequence;
191 }
192
193 Batch getBatch() {
194 return batch;
195 }
196
197 long getBatchSequence() {
198 return batchSequence;
199 }
200
201 final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
202 public PersistBatchEvent newInstance() {
203 return new PersistBatchEvent();
204 }
205 };
206
207 @Override
208 public String toString() {
209 return MoreObjects.toStringHelper(this)
210 .add("batchSequence", batchSequence)
211 .add("batch", batch)
212 .toString();
213 }
214
215 }
216
217 }