View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21  import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
22  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
23  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
24  import com.google.inject.name.Named;
25  import com.lmax.disruptor.EventFactory;
26  import com.lmax.disruptor.RingBuffer;
27  import com.lmax.disruptor.WaitStrategy;
28  import com.lmax.disruptor.dsl.Disruptor;
29  
30  import org.apache.commons.pool2.ObjectPool;
31  import org.apache.omid.committable.CommitTable;
32  import org.apache.omid.metrics.MetricsRegistry;
33  import org.jboss.netty.channel.Channel;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import javax.inject.Inject;
38  
39  import java.io.IOException;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.Executors;
42  
43  import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
44  import static java.util.concurrent.TimeUnit.SECONDS;
45  import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
46  import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
47  
48  class PersistenceProcessorImpl implements PersistenceProcessor {
49  
50      private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
51  
52      // Disruptor-related attributes
53      private final ExecutorService disruptorExec;
54      private final Disruptor<PersistBatchEvent> disruptor;
55      private final RingBuffer<PersistBatchEvent> persistRing;
56  
57      private final ObjectPool<Batch> batchPool;
58      @VisibleForTesting
59      Batch currentBatch;
60  
61      // TODO Next two need to be either int or AtomicLong
62      volatile private long batchSequence;
63      private MetricsRegistry metrics;
64  
65      @Inject
66      PersistenceProcessorImpl(TSOServerConfig config,
67                               @Named("PersistenceStrategy") WaitStrategy strategy,
68                               CommitTable commitTable,
69                               ObjectPool<Batch> batchPool,
70                               Panicker panicker,
71                               PersistenceProcessorHandler[] handlers,
72                               MetricsRegistry metrics)
73              throws Exception {
74  
75          // ------------------------------------------------------------------------------------------------------------
76          // Disruptor initialization
77          // ------------------------------------------------------------------------------------------------------------
78  
79          ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
80          this.disruptorExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
81  
82          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, strategy);
83          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
84          disruptor.handleEventsWithWorkerPool(handlers);
85          this.persistRing = disruptor.start();
86  
87          // ------------------------------------------------------------------------------------------------------------
88          // Attribute initialization
89          // ------------------------------------------------------------------------------------------------------------
90  
91          this.metrics = metrics;
92          this.batchSequence = 0L;
93          this.batchPool = batchPool;
94          this.currentBatch = batchPool.borrowObject();
95  
96          LOG.info("PersistentProcessor initialized");
97      }
98  
99      @Override
100     public void triggerCurrentBatchFlush() throws Exception {
101 
102         if (currentBatch.isEmpty()) {
103             return;
104         }
105         long seq = persistRing.next();
106         PersistBatchEvent e = persistRing.get(seq);
107         makePersistBatch(e, batchSequence++, currentBatch);
108         persistRing.publish(seq);
109         currentBatch = batchPool.borrowObject();
110 
111     }
112 
113     @Override
114     public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
115                                  Optional<Long> newLowWatermark)
116             throws Exception {
117 
118         currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
119         if (currentBatch.isFull()) {
120             triggerCurrentBatchFlush();
121         }
122 
123     }
124 
125     @Override
126     public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
127         currentBatch.addCommitRetry(startTimestamp, c, monCtx);
128         if (currentBatch.isFull()) {
129             triggerCurrentBatchFlush();
130         }
131     }
132 
133     @Override
134     public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx)
135             throws Exception {
136 
137         currentBatch.addAbort(startTimestamp, c, monCtx);
138         if (currentBatch.isFull()) {
139             triggerCurrentBatchFlush();
140         }
141 
142     }
143 
144     @Override
145     public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
146 
147         currentBatch.addTimestamp(startTimestamp, c, monCtx);
148         if (currentBatch.isFull()) {
149             triggerCurrentBatchFlush();
150         }
151 
152     }
153 
154     @Override
155     public void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
156 
157         currentBatch.addFence(tableID, fenceTimestamp, c, monCtx);
158         if (currentBatch.isFull()) {
159             triggerCurrentBatchFlush();
160         }
161 
162     }
163 
164     @Override
165     public void close() throws IOException {
166 
167         LOG.info("Terminating Persistence Processor...");
168         disruptor.halt();
169         disruptor.shutdown();
170         LOG.info("\tPersistence Processor Disruptor shutdown");
171         disruptorExec.shutdownNow();
172         try {
173             disruptorExec.awaitTermination(3, SECONDS);
174             LOG.info("\tPersistence Processor Disruptor executor shutdown");
175         } catch (InterruptedException e) {
176             LOG.error("Interrupted whilst finishing Persistence Processor Disruptor executor");
177             Thread.currentThread().interrupt();
178         }
179         LOG.info("Persistence Processor terminated");
180 
181     }
182 
183     final static class PersistBatchEvent {
184 
185         private long batchSequence;
186         private Batch batch;
187 
188         static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
189             e.batch = batch;
190             e.batchSequence = batchSequence;
191         }
192 
193         Batch getBatch() {
194             return batch;
195         }
196 
197         long getBatchSequence() {
198             return batchSequence;
199         }
200 
201         final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
202             public PersistBatchEvent newInstance() {
203                 return new PersistBatchEvent();
204             }
205         };
206 
207         @Override
208         public String toString() {
209             return MoreObjects.toStringHelper(this)
210                     .add("batchSequence", batchSequence)
211                     .add("batch", batch)
212                     .toString();
213         }
214 
215     }
216 
217 }