1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.examples;
19
20 import org.apache.commons.lang.StringUtils;
21 import org.apache.hadoop.hbase.util.Bytes;
22 import org.apache.omid.transaction.RollbackException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.io.IOException;
27 import java.util.Random;
28
29
30
31
32
33
34
35
36
37
38 public class ParallelExecution {
39
40 private static final Logger LOG = LoggerFactory.getLogger(ParallelExecution.class);
41 private static final long heartBeatInterval = 10_000;
42
43 public static void main(String[] args) throws Exception {
44
45 LOG.info("Parsing the command line arguments");
46 int maxThreads = Runtime.getRuntime().availableProcessors();
47 if (args != null && args.length > 2 && StringUtils.isNotEmpty(args[2])) {
48 maxThreads = Integer.parseInt(args[2]);
49 }
50 LOG.info("Execute '{}' concurrent threads", maxThreads);
51
52 for (int i = 0; i < maxThreads; i++) {
53 final SnapshotIsolationExample example = new SnapshotIsolationExample(args);
54 example.setRowIdGenerator(new RandomRowIdGenerator());
55 Thread t = new Thread(new Runnable() {
56 @Override
57 public void run() {
58 long lastHeartBeatTime = System.currentTimeMillis();
59 long counter = 0;
60 long errorCounter = 0;
61 while (true) {
62 LOG.info("New cycle starts");
63 try {
64 example.execute();
65 counter++;
66 } catch (IOException | RollbackException | IllegalStateException e) {
67 LOG.error("", e);
68 errorCounter++;
69 }
70 if (System.currentTimeMillis() > lastHeartBeatTime + heartBeatInterval) {
71 LOG.error(String.format("%s cycles executed, %s errors", counter, errorCounter));
72 lastHeartBeatTime = System.currentTimeMillis();
73 }
74 }
75 }
76 });
77 t.setName(String.format("SnapshotIsolationExample thread %s/%s", i + 1, maxThreads));
78 t.start();
79 }
80
81 }
82
83 private static class RandomRowIdGenerator implements RowIdGenerator {
84
85 private Random random = new Random();
86
87 @Override
88 public byte[] getRowId() {
89 return Bytes.toBytes("id" + random.nextInt());
90 }
91 }
92
93
94 }