View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.benchmarks.tso;
19  
20  import com.google.common.util.concurrent.RateLimiter;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import com.google.inject.Binder;
23  import com.google.inject.Guice;
24  import com.google.inject.Injector;
25  import com.google.inject.Module;
26  import org.apache.omid.benchmarks.utils.IntegerGenerator;
27  import org.apache.omid.committable.CommitTable;
28  import org.apache.omid.metrics.Counter;
29  import org.apache.omid.metrics.MetricsRegistry;
30  import org.apache.omid.metrics.Timer;
31  import org.apache.omid.tso.util.DummyCellIdImpl;
32  import org.apache.omid.tso.client.AbortException;
33  import org.apache.omid.tso.client.CellId;
34  import org.apache.omid.tso.client.OmidClientConfiguration;
35  import org.apache.omid.tso.client.TSOClient;
36  import org.apache.omid.tso.client.TSOFuture;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import java.io.IOException;
41  import java.net.InetAddress;
42  import java.util.ArrayList;
43  import java.util.HashSet;
44  import java.util.List;
45  import java.util.Random;
46  import java.util.Set;
47  import java.util.concurrent.ExecutionException;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.ScheduledExecutorService;
50  import java.util.concurrent.TimeUnit;
51  
52  import static com.codahale.metrics.MetricRegistry.name;
53  
54  class RawTxRunner implements Runnable {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(RawTxRunner.class);
57  
58      private static volatile int txRunnerCounter = 0;
59      private int txRunnerId = txRunnerCounter++;
60  
61      // Config params
62      private final int writesetSize;
63      private final boolean fixedWriteSetSize;
64      private final long commitDelayInMs;
65      private final int percentageOfReadOnlyTxs;
66      private final IntegerGenerator cellIdGenerator;
67      private final Random randomGen;
68  
69      // Main elements
70      private final TSOClient tsoClient;
71      private final CommitTable.Client commitTableClient;
72  
73      // Asynchronous executor for tx post begin sequence: TimestampListener -> Committer -> CommitListener
74      private final ScheduledExecutorService callbackExec =
75              Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
76                      .setNameFormat("tx-runner-" + txRunnerId + "-callback")
77                      .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
78                          @Override
79                          public void uncaughtException(Thread t, Throwable e) {
80                              LOG.error("Thread {} threw exception", t, e);
81                          }
82                      }).build());
83  
84      // Statistics to save
85      private final Timer timestampTimer;
86      private final Timer commitTimer;
87      private final Timer abortTimer;
88      private final Counter errorCounter;
89  
90      // Allows to setup a maximum rate for the client in req/sec
91      private final RateLimiter rateLimiter;
92  
93      // Is this TxRunner still running?
94      private volatile boolean isRunning = false;
95  
96      RawTxRunner(final TSOServerBenchmarkConfig expConfig) throws IOException, InterruptedException {
97  
98          // Injector configuration
99          List<Module> guiceModules = new ArrayList<>();
100         guiceModules.add(new Module() {
101             @Override
102             public void configure(Binder binder) {
103                 binder.bind(MetricsRegistry.class).toInstance(expConfig.getMetrics());
104             }
105         });
106         guiceModules.add(expConfig.getCommitTableStoreModule());
107         Injector injector = Guice.createInjector(guiceModules);
108 
109         // Tx Runner config
110         this.writesetSize = expConfig.getWritesetSize();
111         this.fixedWriteSetSize = expConfig.isFixedWritesetSize();
112         this.commitDelayInMs = expConfig.getCommitDelayInMs();
113         this.percentageOfReadOnlyTxs = expConfig.getPercentageOfReadOnlyTxs();
114         this.cellIdGenerator = expConfig.getCellIdGenerator();
115         this.randomGen = new Random(System.currentTimeMillis() * txRunnerId); // to make it channel dependent
116 
117         int txRateInReqPerSec = expConfig.getTxRateInRequestPerSecond();
118         long warmUpPeriodInSecs = expConfig.getWarmUpPeriodInSecs();
119 
120         LOG.info("TxRunner-{} [ Tx Rate (Req per Sec) -> {} ]", txRunnerId, txRateInReqPerSec);
121         LOG.info("TxRunner-{} [ Warm Up Period -> {} Secs ]", txRunnerId, warmUpPeriodInSecs);
122         LOG.info("TxRunner-{} [ Cell Id Distribution Generator -> {} ]", txRunnerId, expConfig.getCellIdGenerator().getClass());
123         LOG.info("TxRunner-{} [ Max Tx Size -> {} Fixed: {} ]", txRunnerId, writesetSize, fixedWriteSetSize);
124         LOG.info("TxRunner-{} [ Commit delay -> {} Ms ]", txRunnerId, commitDelayInMs);
125         LOG.info("TxRunner-{} [ % of Read-Only Tx -> {} % ]", txRunnerId, percentageOfReadOnlyTxs);
126 
127         // Commit table client initialization
128         CommitTable commitTable = injector.getInstance(CommitTable.class);
129         this.commitTableClient = commitTable.getClient();
130 
131         // Stat initialization
132         MetricsRegistry metrics = injector.getInstance(MetricsRegistry.class);
133         String hostName = InetAddress.getLocalHost().getHostName();
134         this.timestampTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "timestamp"));
135         this.commitTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "commit"));
136         this.abortTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "abort"));
137         this.errorCounter = metrics.counter(name("tx_runner", Integer.toString(txRunnerId), hostName, "errors"));
138         LOG.info("TxRunner-{} [ Metrics provider module -> {} ]", txRunnerId, expConfig.getMetrics().getClass());
139 
140         // TSO Client initialization
141         OmidClientConfiguration tsoClientConf = expConfig.getOmidClientConfiguration();
142         this.tsoClient = TSOClient.newInstance(tsoClientConf);
143         LOG.info("TxRunner-{} [ Connection Type {}/Connection String {} ]", txRunnerId,
144                  tsoClientConf.getConnectionType(), tsoClientConf.getConnectionString());
145 
146         // Limiter for configured request per second
147         this.rateLimiter = RateLimiter.create((double) txRateInReqPerSec, warmUpPeriodInSecs, TimeUnit.SECONDS);
148     }
149 
150     @Override
151     public void run() {
152 
153         isRunning = true;
154 
155         while (isRunning) {
156             rateLimiter.acquire();
157             long tsRequestTime = System.nanoTime();
158             final TSOFuture<Long> tsFuture = tsoClient.getNewStartTimestamp();
159             tsFuture.addListener(new TimestampListener(tsFuture, tsRequestTime), callbackExec);
160         }
161 
162         shutdown();
163 
164     }
165 
166     void stop() {
167         isRunning = false;
168     }
169 
170     private void shutdown() {
171 
172         try {
173             LOG.info("Finishing TxRunner in 3 secs", txRunnerId);
174             boolean wasSuccess = callbackExec.awaitTermination(3, TimeUnit.SECONDS);
175             if (!wasSuccess) {
176                 callbackExec.shutdownNow();
177             }
178             commitTableClient.close();
179             tsoClient.close().get();
180         } catch (InterruptedException e) {
181             Thread.currentThread().interrupt();
182             // ignore
183         } catch (ExecutionException | IOException e) {
184             // ignore
185         } finally {
186             LOG.info("TxRunner {} finished", txRunnerId);
187         }
188 
189     }
190 
191     private class TimestampListener implements Runnable {
192 
193         final TSOFuture<Long> tsFuture;
194         final long tsRequestTime;
195 
196         TimestampListener(TSOFuture<Long> tsFuture, long tsRequestTime) {
197             this.tsFuture = tsFuture;
198             this.tsRequestTime = tsRequestTime;
199         }
200 
201         @Override
202         public void run() {
203 
204             try {
205                 long txId = tsFuture.get();
206                 timestampTimer.update(System.nanoTime() - tsRequestTime);
207                 if (commitDelayInMs <= 0) {
208                     callbackExec.execute(new Committer(txId));
209                 } else {
210                     callbackExec.schedule(new Committer(txId), commitDelayInMs, TimeUnit.MILLISECONDS);
211                 }
212             } catch (InterruptedException e) {
213                 Thread.currentThread().interrupt();
214                 errorCounter.inc();
215             } catch (ExecutionException e) {
216                 errorCounter.inc();
217             }
218 
219         }
220 
221     }
222 
223     private class Committer implements Runnable {
224 
225         final long txId;
226 
227         Committer(long txId) {
228             this.txId = txId;
229         }
230 
231         @Override
232         public void run() {
233 
234             int txWritesetSize = calculateTxWritesetSize();
235 
236             if (txWritesetSize == 0) {
237                 return; // Read only tx, no need to commit
238             }
239             // Otherwise, we create the writeset...
240             final Set<CellId> cells = new HashSet<>();
241             for (byte i = 0; i < txWritesetSize; i++) {
242                 long cellId = cellIdGenerator.nextInt();
243                 cells.add(new DummyCellIdImpl(cellId));
244             }
245             // ... and we commit the transaction
246             long startCommitTimeInNs = System.nanoTime();
247             final TSOFuture<Long> commitFuture = tsoClient.commit(txId, cells);
248             commitFuture.addListener(new CommitListener(txId, commitFuture, startCommitTimeInNs), callbackExec);
249 
250         }
251 
252         private int calculateTxWritesetSize() {
253             int txSize = 0;
254             boolean readOnly = (randomGen.nextFloat() * 100) < percentageOfReadOnlyTxs;
255             if (!readOnly) {
256                 if (fixedWriteSetSize) {
257                     txSize = writesetSize;
258                 } else {
259                     txSize = randomGen.nextInt(writesetSize) + 1;
260                 }
261             }
262             return txSize;
263         }
264 
265     }
266 
267     private class CommitListener implements Runnable {
268 
269         final long txId;
270         final long commitRequestTime;
271         final TSOFuture<Long> commitFuture;
272 
273         CommitListener(long txId, TSOFuture<Long> commitFuture, long commitRequestTime) {
274             this.txId = txId;
275             this.commitFuture = commitFuture;
276             this.commitRequestTime = commitRequestTime;
277         }
278 
279         @Override
280         public void run() {
281 
282             try {
283                 commitFuture.get();
284                 commitTableClient.completeTransaction(txId).get();
285                 commitTimer.update(System.nanoTime() - commitRequestTime);
286             } catch (InterruptedException e) {
287                 Thread.currentThread().interrupt();
288                 errorCounter.inc();
289             } catch (ExecutionException e) {
290                 if (e.getCause() instanceof AbortException) {
291                     abortTimer.update(System.nanoTime() - commitRequestTime);
292                 } else {
293                     errorCounter.inc();
294                 }
295             }
296 
297         }
298 
299     }
300 
301 }