1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.benchmarks.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31
32
33
34
35 public class TSOServerBenchmark implements Closeable {
36
37 private static final Logger LOG = LoggerFactory.getLogger(TSOServerBenchmark.class);
38
39 private volatile boolean isCleaningDone = false;
40
41 private final TSOServerBenchmarkConfig expConfig;
42
43
44 private final ArrayList<RawTxRunner> txRunners = new ArrayList<>();
45 private final ScheduledExecutorService txRunnerExec;
46
47 private TSOServerBenchmark(TSOServerBenchmarkConfig expConfig) throws IOException {
48
49 this.expConfig = expConfig;
50
51
52 Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
53 @Override
54 public void uncaughtException(Thread t, Throwable e) {
55 LOG.error("Thread {} threw exception", t, e);
56 }
57 };
58 ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder()
59 .setNameFormat("tx-runner-%d")
60 .setUncaughtExceptionHandler(uncaughtExceptionHandler);
61 this.txRunnerExec = Executors.newScheduledThreadPool(expConfig.getTxRunners(), threadFactoryBuilder.build());
62
63 }
64
65 public static void main(String[] args) throws Exception {
66
67 final TSOServerBenchmarkConfig config = new TSOServerBenchmarkConfig();
68
69 final int nOfTxRunners = config.getTxRunners();
70
71 final long benchmarkRunLengthInMins = config.getBenchmarkRunLengthInMins();
72
73 try (TSOServerBenchmark tsoBenchmark = new TSOServerBenchmark(config)) {
74
75 tsoBenchmark.attachShutDownHook();
76
77 LOG.info("----- Starting TSO Benchmark [ {} TxRunner clients ] -----", nOfTxRunners);
78
79 for (int i = 0; i < nOfTxRunners; ++i) {
80 tsoBenchmark.createTxRunner();
81 }
82
83 LOG.info("Benchmark run lenght {} Mins", benchmarkRunLengthInMins);
84 TimeUnit.MINUTES.sleep(benchmarkRunLengthInMins);
85
86 } finally {
87 LOG.info("----- TSO Benchmark complete - Check metrics from individual clients in log -----");
88 }
89
90 }
91
92 private void attachShutDownHook() {
93 Runtime.getRuntime().addShutdownHook(new Thread("benchmark-cleaner") {
94 @Override
95 public void run() {
96 if (!isCleaningDone) {
97 close();
98 }
99 }
100 });
101 LOG.info("Shutdown Hook Attached");
102 }
103
104 private void createTxRunner() throws IOException, InterruptedException, ExecutionException {
105
106 RawTxRunner txRunner = new RawTxRunner(expConfig);
107 txRunnerExec.submit(txRunner);
108
109 txRunners.add(txRunner);
110
111 }
112
113 @Override
114 public void close() {
115
116
117 for (RawTxRunner txRunner : txRunners) {
118 txRunner.stop();
119 }
120
121
122 try {
123 LOG.info("Closing TxRunner Executor in 10 secs");
124 boolean wasSuccess = txRunnerExec.awaitTermination(10, TimeUnit.SECONDS);
125 if (!wasSuccess) {
126 txRunnerExec.shutdownNow();
127 }
128 } catch (InterruptedException e) {
129 Thread.currentThread().interrupt();
130 LOG.info("Interrupted whilst shutting down TxRunner Executor!");
131 } finally {
132 LOG.info("TxRunner Executor stopped");
133 }
134
135 isCleaningDone = true;
136 }
137
138 }