View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.examples;
19  
20  import org.apache.commons.lang.StringUtils;
21  import org.apache.hadoop.hbase.util.Bytes;
22  import org.apache.omid.transaction.RollbackException;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.io.IOException;
27  import java.util.Random;
28  
29  /**
30   * ****************************************************************************************************************
31   *
32   * Same as SnapshotIsolationExample only executes multiple transactions concurrently
33   *
34   * ****************************************************************************************************************
35   *
36   * Please @see{SnapshotIsolationExample}
37   */
38  public class ParallelExecution {
39  
40      private static final Logger LOG = LoggerFactory.getLogger(ParallelExecution.class);
41      private static final long heartBeatInterval = 10_000;
42  
43      public static void main(String[] args) throws Exception {
44  
45          LOG.info("Parsing the command line arguments");
46          int maxThreads = Runtime.getRuntime().availableProcessors();
47          if (args != null && args.length > 2 && StringUtils.isNotEmpty(args[2])) {
48              maxThreads = Integer.parseInt(args[2]);
49          }
50          LOG.info("Execute '{}' concurrent threads", maxThreads);
51  
52          for (int i = 0; i < maxThreads; i++) {
53              final SnapshotIsolationExample example = new SnapshotIsolationExample(args);
54              example.setRowIdGenerator(new RandomRowIdGenerator());
55              Thread t = new Thread(new Runnable() {
56                  @Override
57                  public void run() {
58                      long lastHeartBeatTime = System.currentTimeMillis();
59                      long counter = 0;
60                      long errorCounter = 0;
61                      while (true) {
62                          LOG.info("New cycle starts");
63                          try {
64                              example.execute();
65                              counter++;
66                          } catch (IOException | RollbackException | IllegalStateException e) {
67                              LOG.error("", e);
68                              errorCounter++;
69                          }
70                          if (System.currentTimeMillis() > lastHeartBeatTime + heartBeatInterval) {
71                              LOG.error(String.format("%s cycles executed, %s errors", counter, errorCounter));
72                              lastHeartBeatTime = System.currentTimeMillis();
73                          }
74                      }
75                  }
76              });
77              t.setName(String.format("SnapshotIsolationExample thread %s/%s", i + 1, maxThreads));
78              t.start();
79          }
80  
81      }
82  
83      private static class RandomRowIdGenerator implements RowIdGenerator {
84  
85          private Random random = new Random();
86  
87          @Override
88          public byte[] getRowId() {
89              return Bytes.toBytes("id" + random.nextInt());
90          }
91      }
92  
93  
94  }