View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.base.Objects;
22  import com.google.common.util.concurrent.ThreadFactoryBuilder;
23  import com.lmax.disruptor.BusySpinWaitStrategy;
24  import com.lmax.disruptor.EventFactory;
25  import com.lmax.disruptor.RingBuffer;
26  import com.lmax.disruptor.WorkerPool;
27  import org.apache.commons.pool2.ObjectPool;
28  import org.apache.omid.committable.CommitTable;
29  import org.apache.omid.metrics.MetricsRegistry;
30  import org.apache.omid.metrics.Timer;
31  import org.jboss.netty.channel.Channel;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import javax.inject.Inject;
36  import java.io.IOException;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.Future;
41  
42  import static org.apache.omid.metrics.MetricsUtils.name;
43  import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
44  import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
45  
46  class PersistenceProcessorImpl implements PersistenceProcessor {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
49  
50      private static final long INITIAL_LWM_VALUE = -1L;
51  
52      private final RingBuffer<PersistBatchEvent> persistRing;
53  
54      private final ObjectPool<Batch> batchPool;
55      @VisibleForTesting
56      Batch currentBatch;
57  
58      // TODO Next two need to be either int or AtomicLong
59      volatile private long batchSequence;
60  
61      private CommitTable.Writer lowWatermarkWriter;
62      private ExecutorService lowWatermarkWriterExecutor;
63  
64      private MetricsRegistry metrics;
65      private final Timer lwmWriteTimer;
66  
67      @Inject
68      PersistenceProcessorImpl(TSOServerConfig config,
69                               CommitTable commitTable,
70                               ObjectPool<Batch> batchPool,
71                               Panicker panicker,
72                               PersistenceProcessorHandler[] handlers,
73                               MetricsRegistry metrics)
74              throws Exception {
75  
76          this.metrics = metrics;
77          this.lowWatermarkWriter = commitTable.getWriter();
78          this.batchSequence = 0L;
79          this.batchPool = batchPool;
80          this.currentBatch = batchPool.borrowObject();
81  
82          // Low Watermark writer
83          ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
84          lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
85  
86          // Disruptor configuration
87          this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
88  
89          ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
90          ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(),
91                                                                     threadFactory.build());
92  
93          WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
94                                                                            persistRing.newBarrier(),
95                                                                            new FatalExceptionHandler(panicker),
96                                                                            handlers);
97          this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
98          persistProcessor.start(requestExec);
99  
100         // Metrics config
101         this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
102 
103     }
104 
105     @Override
106     public void triggerCurrentBatchFlush() throws Exception {
107 
108         if (currentBatch.isEmpty()) {
109             return;
110         }
111         long seq = persistRing.next();
112         PersistBatchEvent e = persistRing.get(seq);
113         makePersistBatch(e, batchSequence++, currentBatch);
114         persistRing.publish(seq);
115         currentBatch = batchPool.borrowObject();
116 
117     }
118 
119     @Override
120     public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
121             throws Exception {
122 
123         currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
124         if (currentBatch.isFull()) {
125             triggerCurrentBatchFlush();
126         }
127 
128     }
129 
130     @Override
131     public void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
132             throws Exception {
133 
134         currentBatch.addAbort(startTimestamp, isRetry, c, context);
135         if (currentBatch.isFull()) {
136             triggerCurrentBatchFlush();
137         }
138 
139     }
140 
141     @Override
142     public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context) throws Exception {
143 
144         currentBatch.addTimestamp(startTimestamp, c, context);
145         if (currentBatch.isFull()) {
146             triggerCurrentBatchFlush();
147         }
148 
149     }
150 
151     @Override
152     public Future<Void> persistLowWatermark(final long lowWatermark) {
153 
154         return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
155             @Override
156             public Void call() throws IOException {
157                 try {
158                     lwmWriteTimer.start();
159                     lowWatermarkWriter.updateLowWatermark(lowWatermark);
160                     lowWatermarkWriter.flush();
161                 } finally {
162                     lwmWriteTimer.stop();
163                 }
164                 return null;
165             }
166         });
167 
168     }
169 
170     final static class PersistBatchEvent {
171 
172         private long batchSequence;
173         private Batch batch;
174 
175         static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
176             e.batch = batch;
177             e.batchSequence = batchSequence;
178         }
179 
180         Batch getBatch() {
181             return batch;
182         }
183 
184         long getBatchSequence() {
185             return batchSequence;
186         }
187 
188         final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
189             public PersistBatchEvent newInstance() {
190                 return new PersistBatchEvent();
191             }
192         };
193 
194         @Override
195         public String toString() {
196             return Objects.toStringHelper(this)
197                     .add("batchSequence", batchSequence)
198                     .add("batch", batch)
199                     .toString();
200         }
201 
202     }
203 
204 }