View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.benchmarks.tso;
19  
20  import com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  
32  /**
33   * Benchmark using directly TSOClient to connect to the TSO Server
34   */
35  public class TSOServerBenchmark implements Closeable {
36  
37      private static final Logger LOG = LoggerFactory.getLogger(TSOServerBenchmark.class);
38  
39      private volatile boolean isCleaningDone = false;
40  
41      private final TSOServerBenchmarkConfig expConfig;
42  
43      // Clients triggering txs (threads) & corresponding executor
44      private final ArrayList<RawTxRunner> txRunners = new ArrayList<>();
45      private final ScheduledExecutorService txRunnerExec;
46  
47      private TSOServerBenchmark(TSOServerBenchmarkConfig expConfig) throws IOException {
48  
49          this.expConfig = expConfig;
50  
51          // Executor for TxRunners (Clients triggering transactions)
52          Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
53              @Override
54              public void uncaughtException(Thread t, Throwable e) {
55                  LOG.error("Thread {} threw exception", t, e);
56              }
57          };
58          ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder()
59                  .setNameFormat("tx-runner-%d")
60                  .setUncaughtExceptionHandler(uncaughtExceptionHandler);
61          this.txRunnerExec = Executors.newScheduledThreadPool(expConfig.getTxRunners(), threadFactoryBuilder.build());
62  
63      }
64  
65      public static void main(String[] args) throws Exception {
66  
67          final TSOServerBenchmarkConfig config = new TSOServerBenchmarkConfig();
68  
69          final int nOfTxRunners = config.getTxRunners();
70  
71          final long benchmarkRunLengthInMins = config.getBenchmarkRunLengthInMins();
72  
73          try (TSOServerBenchmark tsoBenchmark = new TSOServerBenchmark(config)) {
74  
75              tsoBenchmark.attachShutDownHook();
76  
77              LOG.info("----- Starting TSO Benchmark [ {} TxRunner clients ] -----", nOfTxRunners);
78  
79              for (int i = 0; i < nOfTxRunners; ++i) {
80                  tsoBenchmark.createTxRunner();
81              }
82  
83              LOG.info("Benchmark run lenght {} Mins", benchmarkRunLengthInMins);
84              TimeUnit.MINUTES.sleep(benchmarkRunLengthInMins);
85  
86          } finally {
87              LOG.info("----- TSO Benchmark complete - Check metrics from individual clients in log -----");
88          }
89  
90      }
91  
92      private void attachShutDownHook() {
93          Runtime.getRuntime().addShutdownHook(new Thread("benchmark-cleaner") {
94              @Override
95              public void run() {
96                  if (!isCleaningDone) {
97                      close();
98                  }
99              }
100         });
101         LOG.info("Shutdown Hook Attached");
102     }
103 
104     private void createTxRunner() throws IOException, InterruptedException, ExecutionException {
105 
106         RawTxRunner txRunner = new RawTxRunner(expConfig);
107         txRunnerExec.submit(txRunner);
108 
109         txRunners.add(txRunner);
110 
111     }
112 
113     @Override
114     public void close() {
115 
116         // Stop clients
117         for (RawTxRunner txRunner : txRunners) {
118             txRunner.stop();
119         }
120 
121         // Shutdown executor
122         try {
123             LOG.info("Closing TxRunner Executor in 10 secs");
124             boolean wasSuccess = txRunnerExec.awaitTermination(10, TimeUnit.SECONDS);
125             if (!wasSuccess) {
126                 txRunnerExec.shutdownNow();
127             }
128         } catch (InterruptedException e) {
129             Thread.currentThread().interrupt();
130             LOG.info("Interrupted whilst shutting down TxRunner Executor!");
131         } finally {
132             LOG.info("TxRunner Executor stopped");
133         }
134 
135         isCleaningDone = true;
136     }
137 
138 }