View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.base.Objects;
22  import com.google.common.util.concurrent.ThreadFactoryBuilder;
23  import com.google.inject.name.Named;
24  import com.lmax.disruptor.EventFactory;
25  import com.lmax.disruptor.RingBuffer;
26  import com.lmax.disruptor.WaitStrategy;
27  import com.lmax.disruptor.dsl.Disruptor;
28  
29  import org.apache.commons.pool2.ObjectPool;
30  import org.apache.omid.committable.CommitTable;
31  import org.apache.omid.metrics.MetricsRegistry;
32  import org.jboss.netty.channel.Channel;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import javax.inject.Inject;
37  
38  import java.io.IOException;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.Executors;
41  
42  import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
43  import static java.util.concurrent.TimeUnit.SECONDS;
44  import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
45  import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
46  
47  class PersistenceProcessorImpl implements PersistenceProcessor {
48  
49      private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
50  
51      // Disruptor-related attributes
52      private final ExecutorService disruptorExec;
53      private final Disruptor<PersistBatchEvent> disruptor;
54      private final RingBuffer<PersistBatchEvent> persistRing;
55  
56      private final ObjectPool<Batch> batchPool;
57      @VisibleForTesting
58      Batch currentBatch;
59  
60      // TODO Next two need to be either int or AtomicLong
61      volatile private long batchSequence;
62      private MetricsRegistry metrics;
63  
64      @Inject
65      PersistenceProcessorImpl(TSOServerConfig config,
66                               @Named("PersistenceStrategy") WaitStrategy strategy,
67                               CommitTable commitTable,
68                               ObjectPool<Batch> batchPool,
69                               Panicker panicker,
70                               PersistenceProcessorHandler[] handlers,
71                               MetricsRegistry metrics)
72              throws Exception {
73  
74          // ------------------------------------------------------------------------------------------------------------
75          // Disruptor initialization
76          // ------------------------------------------------------------------------------------------------------------
77  
78          ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
79          this.disruptorExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
80  
81          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, strategy);
82          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
83          disruptor.handleEventsWithWorkerPool(handlers);
84          this.persistRing = disruptor.start();
85  
86          // ------------------------------------------------------------------------------------------------------------
87          // Attribute initialization
88          // ------------------------------------------------------------------------------------------------------------
89  
90          this.metrics = metrics;
91          this.batchSequence = 0L;
92          this.batchPool = batchPool;
93          this.currentBatch = batchPool.borrowObject();
94  
95          LOG.info("PersistentProcessor initialized");
96      }
97  
98      @Override
99      public void triggerCurrentBatchFlush() throws Exception {
100 
101         if (currentBatch.isEmpty()) {
102             return;
103         }
104         long seq = persistRing.next();
105         PersistBatchEvent e = persistRing.get(seq);
106         makePersistBatch(e, batchSequence++, currentBatch);
107         persistRing.publish(seq);
108         currentBatch = batchPool.borrowObject();
109 
110     }
111 
112     @Override
113     public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
114             throws Exception {
115 
116         currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
117         if (currentBatch.isFull()) {
118             triggerCurrentBatchFlush();
119         }
120 
121     }
122 
123     @Override
124     public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
125         currentBatch.addCommitRetry(startTimestamp, c, monCtx);
126         if (currentBatch.isFull()) {
127             triggerCurrentBatchFlush();
128         }
129     }
130 
131     @Override
132     public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx)
133             throws Exception {
134 
135         currentBatch.addAbort(startTimestamp, c, monCtx);
136         if (currentBatch.isFull()) {
137             triggerCurrentBatchFlush();
138         }
139 
140     }
141 
142     @Override
143     public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
144 
145         currentBatch.addTimestamp(startTimestamp, c, monCtx);
146         if (currentBatch.isFull()) {
147             triggerCurrentBatchFlush();
148         }
149 
150     }
151 
152     @Override
153     public void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
154 
155         currentBatch.addFence(tableID, fenceTimestamp, c, monCtx);
156         if (currentBatch.isFull()) {
157             triggerCurrentBatchFlush();
158         }
159 
160     }
161 
162     @Override
163     public void close() throws IOException {
164 
165         LOG.info("Terminating Persistence Processor...");
166         disruptor.halt();
167         disruptor.shutdown();
168         LOG.info("\tPersistence Processor Disruptor shutdown");
169         disruptorExec.shutdownNow();
170         try {
171             disruptorExec.awaitTermination(3, SECONDS);
172             LOG.info("\tPersistence Processor Disruptor executor shutdown");
173         } catch (InterruptedException e) {
174             LOG.error("Interrupted whilst finishing Persistence Processor Disruptor executor");
175             Thread.currentThread().interrupt();
176         }
177         LOG.info("Persistence Processor terminated");
178 
179     }
180 
181     final static class PersistBatchEvent {
182 
183         private long batchSequence;
184         private Batch batch;
185 
186         static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
187             e.batch = batch;
188             e.batchSequence = batchSequence;
189         }
190 
191         Batch getBatch() {
192             return batch;
193         }
194 
195         long getBatchSequence() {
196             return batchSequence;
197         }
198 
199         final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
200             public PersistBatchEvent newInstance() {
201                 return new PersistBatchEvent();
202             }
203         };
204 
205         @Override
206         public String toString() {
207             return Objects.toStringHelper(this)
208                     .add("batchSequence", batchSequence)
209                     .add("batch", batch)
210                     .toString();
211         }
212 
213     }
214 
215 }