View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.benchmarks.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.RateLimiter;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import com.google.inject.Binder;
23  import com.google.inject.Guice;
24  import com.google.inject.Injector;
25  import com.google.inject.Module;
26  import org.apache.omid.benchmarks.utils.IntegerGenerator;
27  import org.apache.omid.committable.CommitTable;
28  import org.apache.omid.metrics.Counter;
29  import org.apache.omid.metrics.MetricsRegistry;
30  import org.apache.omid.metrics.Timer;
31  import org.apache.omid.tso.util.DummyCellIdImpl;
32  import org.apache.omid.tso.client.AbortException;
33  import org.apache.omid.tso.client.CellId;
34  import org.apache.omid.tso.client.OmidClientConfiguration;
35  import org.apache.omid.tso.client.TSOClient;
36  import org.apache.omid.tso.client.TSOFuture;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import java.io.IOException;
41  import java.net.InetAddress;
42  import java.util.ArrayList;
43  import java.util.HashSet;
44  import java.util.List;
45  import java.util.Random;
46  import java.util.Set;
47  import java.util.concurrent.ExecutionException;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.ScheduledExecutorService;
50  import java.util.concurrent.TimeUnit;
51  
52  import static com.codahale.metrics.MetricRegistry.name;
53  
54  class RawTxRunner implements Runnable {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(RawTxRunner.class);
57  
58      private static volatile int txRunnerCounter = 0;
59      private int txRunnerId = txRunnerCounter++;
60  
61      // Config params
62      private final int writesetSize;
63      private final boolean fixedWriteSetSize;
64      private final long commitDelayInMs;
65      private final int percentageOfReadOnlyTxs;
66      private final IntegerGenerator cellIdGenerator;
67      private final Random randomGen;
68  
69      // Main elements
70      private final TSOClient tsoClient;
71      private final CommitTable.Client commitTableClient;
72  
73      // Asynchronous executor for tx post begin sequence: TimestampListener -> Committer -> CommitListener
74      private final ScheduledExecutorService callbackExec =
75              Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
76                      .setNameFormat("tx-runner-" + txRunnerId + "-callback")
77                      .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
78                          @Override
79                          public void uncaughtException(Thread t, Throwable e) {
80                              LOG.error("Thread {} threw exception", t, e);
81                          }
82                      }).build());
83  
84      // Statistics to save
85      private final Timer timestampTimer;
86      private final Timer commitTimer;
87      private final Timer abortTimer;
88      private final Counter errorCounter;
89  
90      // Allows to setup a maximum rate for the client in req/sec
91      private final RateLimiter rateLimiter;
92  
93      // Is this TxRunner still running?
94      private volatile boolean isRunning = false;
95  
96      RawTxRunner(final TSOServerBenchmarkConfig expConfig) throws IOException, InterruptedException {
97  
98          // Injector configuration
99          List<Module> guiceModules = new ArrayList<>();
100         guiceModules.add(new Module() {
101             @Override
102             public void configure(Binder binder) {
103                 binder.bind(MetricsRegistry.class).toInstance(expConfig.getMetrics());
104             }
105         });
106         guiceModules.add(expConfig.getCommitTableStoreModule());
107         Injector injector = Guice.createInjector(guiceModules);
108 
109         // Tx Runner config
110         this.writesetSize = expConfig.getWritesetSize();
111         this.fixedWriteSetSize = expConfig.isFixedWritesetSize();
112         this.commitDelayInMs = expConfig.getCommitDelayInMs();
113         this.percentageOfReadOnlyTxs = expConfig.getPercentageOfReadOnlyTxs();
114         this.cellIdGenerator = expConfig.getCellIdGenerator();
115         this.randomGen = new Random(System.currentTimeMillis() * txRunnerId); // to make it channel dependent
116 
117         int txRateInReqPerSec = expConfig.getTxRateInRequestPerSecond();
118         long warmUpPeriodInSecs = expConfig.getWarmUpPeriodInSecs();
119 
120         LOG.info("TxRunner-{} [ Tx Rate (Req per Sec) -> {} ]", txRunnerId, txRateInReqPerSec);
121         LOG.info("TxRunner-{} [ Warm Up Period -> {} Secs ]", txRunnerId, warmUpPeriodInSecs);
122         LOG.info("TxRunner-{} [ Cell Id Distribution Generator -> {} ]", txRunnerId, expConfig.getCellIdGenerator().getClass());
123         LOG.info("TxRunner-{} [ Max Tx Size -> {} Fixed: {} ]", txRunnerId, writesetSize, fixedWriteSetSize);
124         LOG.info("TxRunner-{} [ Commit delay -> {} Ms ]", txRunnerId, commitDelayInMs);
125         LOG.info("TxRunner-{} [ % of Read-Only Tx -> {} % ]", txRunnerId, percentageOfReadOnlyTxs);
126 
127         // Commit table client initialization
128         CommitTable commitTable = injector.getInstance(CommitTable.class);
129         this.commitTableClient = commitTable.getClient();
130 
131         // Stat initialization
132         MetricsRegistry metrics = injector.getInstance(MetricsRegistry.class);
133         String hostName = InetAddress.getLocalHost().getHostName();
134         this.timestampTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "timestamp"));
135         this.commitTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "commit"));
136         this.abortTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "abort"));
137         this.errorCounter = metrics.counter(name("tx_runner", Integer.toString(txRunnerId), hostName, "errors"));
138         LOG.info("TxRunner-{} [ Metrics provider module -> {} ]", txRunnerId, expConfig.getMetrics().getClass());
139 
140         // TSO Client initialization
141         OmidClientConfiguration tsoClientConf = expConfig.getOmidClientConfiguration();
142         this.tsoClient = TSOClient.newInstance(tsoClientConf);
143         LOG.info("TxRunner-{} [ Connection Type {}/Connection String {} ]", txRunnerId,
144                  tsoClientConf.getConnectionType(), tsoClientConf.getConnectionString());
145 
146         // Limiter for configured request per second
147         this.rateLimiter = RateLimiter.create((double) txRateInReqPerSec, warmUpPeriodInSecs, TimeUnit.SECONDS);
148     }
149 
150     @Override
151     public void run() {
152 
153         isRunning = true;
154 
155         while (isRunning) {
156             rateLimiter.acquire();
157             long tsRequestTime = System.nanoTime();
158             final TSOFuture<Long> tsFuture = tsoClient.getNewStartTimestamp();
159             tsFuture.addListener(new TimestampListener(tsFuture, tsRequestTime), callbackExec);
160         }
161 
162         shutdown();
163 
164     }
165 
166     void stop() {
167         isRunning = false;
168     }
169 
170     private void shutdown() {
171 
172         try {
173             LOG.info("Finishing TxRunner in 3 secs", txRunnerId);
174             boolean wasSuccess = callbackExec.awaitTermination(3, TimeUnit.SECONDS);
175             if (!wasSuccess) {
176                 callbackExec.shutdownNow();
177             }
178             tsoClient.close().get();
179         } catch (InterruptedException e) {
180             Thread.currentThread().interrupt();
181             // ignore
182         } catch (ExecutionException e) {
183             // ignore
184         } finally {
185             LOG.info("TxRunner {} finished", txRunnerId);
186         }
187 
188     }
189 
190     private class TimestampListener implements Runnable {
191 
192         final TSOFuture<Long> tsFuture;
193         final long tsRequestTime;
194 
195         TimestampListener(TSOFuture<Long> tsFuture, long tsRequestTime) {
196             this.tsFuture = tsFuture;
197             this.tsRequestTime = tsRequestTime;
198         }
199 
200         @Override
201         public void run() {
202 
203             try {
204                 long txId = tsFuture.get();
205                 timestampTimer.update(System.nanoTime() - tsRequestTime);
206                 if (commitDelayInMs <= 0) {
207                     callbackExec.execute(new Committer(txId));
208                 } else {
209                     callbackExec.schedule(new Committer(txId), commitDelayInMs, TimeUnit.MILLISECONDS);
210                 }
211             } catch (InterruptedException e) {
212                 Thread.currentThread().interrupt();
213                 errorCounter.inc();
214             } catch (ExecutionException e) {
215                 errorCounter.inc();
216             }
217 
218         }
219 
220     }
221 
222     private class Committer implements Runnable {
223 
224         final long txId;
225 
226         Committer(long txId) {
227             this.txId = txId;
228         }
229 
230         @Override
231         public void run() {
232 
233             int txWritesetSize = calculateTxWritesetSize();
234 
235             if (txWritesetSize == 0) {
236                 return; // Read only tx, no need to commit
237             }
238             // Otherwise, we create the writeset...
239             final Set<CellId> cells = new HashSet<>();
240             for (byte i = 0; i < txWritesetSize; i++) {
241                 long cellId = cellIdGenerator.nextInt();
242                 cells.add(new DummyCellIdImpl(cellId));
243             }
244             // ... and we commit the transaction
245             long startCommitTimeInNs = System.nanoTime();
246             final TSOFuture<Long> commitFuture = tsoClient.commit(txId, cells);
247             commitFuture.addListener(new CommitListener(txId, commitFuture, startCommitTimeInNs), callbackExec);
248 
249         }
250 
251         private int calculateTxWritesetSize() {
252             int txSize = 0;
253             boolean readOnly = (randomGen.nextFloat() * 100) < percentageOfReadOnlyTxs;
254             if (!readOnly) {
255                 if (fixedWriteSetSize) {
256                     txSize = writesetSize;
257                 } else {
258                     txSize = randomGen.nextInt(writesetSize) + 1;
259                 }
260             }
261             return txSize;
262         }
263 
264     }
265 
266     private class CommitListener implements Runnable {
267 
268         final long txId;
269         final long commitRequestTime;
270         final TSOFuture<Long> commitFuture;
271 
272         CommitListener(long txId, TSOFuture<Long> commitFuture, long commitRequestTime) {
273             this.txId = txId;
274             this.commitFuture = commitFuture;
275             this.commitRequestTime = commitRequestTime;
276         }
277 
278         @Override
279         public void run() {
280 
281             try {
282                 commitFuture.get();
283                 commitTableClient.deleteCommitEntry(txId).get();
284                 commitTimer.update(System.nanoTime() - commitRequestTime);
285             } catch (InterruptedException e) {
286                 Thread.currentThread().interrupt();
287                 errorCounter.inc();
288             } catch (ExecutionException e) {
289                 if (e.getCause() instanceof AbortException) {
290                     abortTimer.update(System.nanoTime() - commitRequestTime);
291                 } else {
292                     errorCounter.inc();
293                 }
294             }
295 
296         }
297 
298     }
299 
300 }