1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.omid.examples;
19  
20  import org.apache.commons.lang.StringUtils;
21  import org.apache.hadoop.hbase.util.Bytes;
22  import org.apache.omid.transaction.RollbackException;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.io.IOException;
27  import java.util.Random;
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  public class ParallelExecution {
39  
40      private static final Logger LOG = LoggerFactory.getLogger(ParallelExecution.class);
41      private static final long heartBeatInterval = 10_000;
42  
43      public static void main(String[] args) throws Exception {
44  
45          LOG.info("Parsing the command line arguments");
46          int maxThreads = Runtime.getRuntime().availableProcessors();
47          if (args != null && args.length > 2 && StringUtils.isNotEmpty(args[2])) {
48              maxThreads = Integer.parseInt(args[2]);
49          }
50          LOG.info("Execute '{}' concurrent threads", maxThreads);
51  
52          for (int i = 0; i < maxThreads; i++) {
53              final SnapshotIsolationExample example = new SnapshotIsolationExample(args);
54              example.setRowIdGenerator(new RandomRowIdGenerator());
55              Thread t = new Thread(new Runnable() {
56                  @Override
57                  public void run() {
58                      long lastHeartBeatTime = System.currentTimeMillis();
59                      long counter = 0;
60                      long errorCounter = 0;
61                      while (true) {
62                          LOG.info("New cycle starts");
63                          try {
64                              example.execute();
65                              counter++;
66                          } catch (IOException | RollbackException | IllegalStateException e) {
67                              LOG.error("", e);
68                              errorCounter++;
69                          }
70                          if (System.currentTimeMillis() > lastHeartBeatTime + heartBeatInterval) {
71                              LOG.error(String.format("%s cycles executed, %s errors", counter, errorCounter));
72                              lastHeartBeatTime = System.currentTimeMillis();
73                          }
74                      }
75                  }
76              });
77              t.setName(String.format("SnapshotIsolationExample thread %s/%s", i + 1, maxThreads));
78              t.start();
79          }
80  
81      }
82  
83      private static class RandomRowIdGenerator implements RowIdGenerator {
84  
85          private Random random = new Random();
86  
87          @Override
88          public byte[] getRowId() {
89              return Bytes.toBytes("id" + random.nextInt());
90          }
91      }
92  
93  
94  }