1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.benchmarks.hbase;
19
20 import com.beust.jcommander.JCommander;
21 import com.beust.jcommander.Parameter;
22 import com.beust.jcommander.ParametersDelegate;
23 import com.codahale.metrics.ConsoleReporter;
24 import com.codahale.metrics.Meter;
25 import com.codahale.metrics.MetricFilter;
26 import com.codahale.metrics.MetricRegistry;
27 import com.codahale.metrics.Timer;
28 import com.codahale.metrics.graphite.Graphite;
29 import com.codahale.metrics.graphite.GraphiteReporter;
30 import org.apache.omid.committable.CommitTable;
31 import org.apache.omid.committable.hbase.HBaseCommitTable;
32 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
33 import org.apache.omid.committable.hbase.KeyGenerator;
34 import org.apache.omid.committable.hbase.KeyGeneratorImplementations.BadRandomKeyGenerator;
35 import org.apache.omid.committable.hbase.KeyGeneratorImplementations.BucketKeyGenerator;
36 import org.apache.omid.committable.hbase.KeyGeneratorImplementations.FullRandomKeyGenerator;
37 import org.apache.omid.committable.hbase.KeyGeneratorImplementations.SeqKeyGenerator;
38 import org.apache.omid.tools.hbase.HBaseLogin;
39 import org.apache.omid.tools.hbase.SecureHBaseConfig;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42
43 import java.net.InetSocketAddress;
44 import java.util.concurrent.TimeUnit;
45
46 public class HBaseCommitTableTester {
47
48 private static class Config {
49
50 @Parameter(names = "-fullRandomAlgo", description = "Full random algo")
51 boolean fullRandomAlgo = false;
52
53 @Parameter(names = "-badRandomAlgo", description = "The original algo")
54 boolean badRandomAlgo = false;
55
56 @Parameter(names = "-bucketAlgo", description = "Bucketing algorithm")
57 boolean bucketingAlgo = false;
58
59 @Parameter(names = "-seqAlgo", description = "Sequential algorithm")
60 boolean seqAlgo = false;
61
62 @Parameter(names = "-batchSize", description = "batch size")
63 int batchSize = 10000;
64
65 @Parameter(names = "-graphite", description = "graphite server to report to")
66 String graphite = null;
67
68 @ParametersDelegate
69 SecureHBaseConfig loginFlags = new SecureHBaseConfig();
70 }
71
72 @SuppressWarnings("InfiniteLoopStatement")
73 public static void main(String[] args) throws Exception {
74 Config config = new Config();
75 new JCommander(config, args);
76
77 Configuration hbaseConfig = HBaseConfiguration.create();
78
79 final KeyGenerator keygen;
80 if (config.fullRandomAlgo) {
81 keygen = new FullRandomKeyGenerator();
82 } else if (config.badRandomAlgo) {
83 keygen = new BadRandomKeyGenerator();
84 } else if (config.bucketingAlgo) {
85 keygen = new BucketKeyGenerator();
86 } else if (config.seqAlgo) {
87 keygen = new SeqKeyGenerator();
88 } else {
89 throw new IllegalArgumentException("Not supported keygen type");
90 }
91
92 HBaseLogin.loginIfNeeded(config.loginFlags);
93
94 HBaseCommitTableConfig commitTableConfig = new HBaseCommitTableConfig();
95 CommitTable commitTable = new HBaseCommitTable(hbaseConfig, commitTableConfig, keygen);
96
97 CommitTable.Writer writer = commitTable.getWriter();
98
99 MetricRegistry metrics = new MetricRegistry();
100 if (config.graphite != null) {
101 String parts[] = config.graphite.split(":");
102 String host = parts[0];
103 Integer port = Integer.valueOf(parts[1]);
104
105 final Graphite graphite = new Graphite(new InetSocketAddress(host, port));
106 final GraphiteReporter reporter = GraphiteReporter.forRegistry(metrics)
107 .prefixedWith("omid-hbase." + keygen.getClass().getSimpleName())
108 .convertRatesTo(TimeUnit.SECONDS)
109 .convertDurationsTo(TimeUnit.MILLISECONDS)
110 .filter(MetricFilter.ALL)
111 .build(graphite);
112 reporter.start(10, TimeUnit.SECONDS);
113 }
114 final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
115 .convertRatesTo(TimeUnit.SECONDS)
116 .convertDurationsTo(TimeUnit.MILLISECONDS)
117 .build();
118 reporter.start(10, TimeUnit.SECONDS);
119
120 Timer flushTimer = metrics.timer("flush");
121 Meter commitsMeter = metrics.meter("commits");
122
123 int i = 0;
124 long ts = 0;
125 while (true) {
126 writer.addCommittedTransaction(ts++, ts++);
127 if (i++ == config.batchSize) {
128 commitsMeter.mark(i);
129 long start = System.nanoTime();
130 writer.flush();
131 flushTimer.update((System.nanoTime() - start), TimeUnit.NANOSECONDS);
132 i = 0;
133 }
134 }
135 }
136
137 }