1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.examples;
19
20 import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.DIRECT;
21
22 import java.io.IOException;
23
24 import org.apache.commons.lang.StringUtils;
25 import org.apache.hadoop.hbase.client.Connection;
26 import org.apache.hadoop.hbase.client.ConnectionFactory;
27 import org.apache.hadoop.hbase.client.Put;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.apache.omid.transaction.HBaseOmidClientConfiguration;
30 import org.apache.omid.transaction.HBaseTransactionManager;
31 import org.apache.omid.transaction.RollbackException;
32 import org.apache.omid.transaction.TTable;
33 import org.apache.omid.transaction.Transaction;
34 import org.apache.omid.transaction.TransactionManager;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38
39
40
41
42
43
44
45
46
47
48 public class ConfigurationExample {
49 private static final Logger LOG = LoggerFactory.getLogger(ConfigurationExample.class);
50
51 public static void main(String[] args) throws Exception {
52
53 LOG.info("Parsing command line arguments");
54 String userTableName = "MY_TX_TABLE";
55 if (args != null && args.length > 0 && StringUtils.isNotEmpty(args[0])) {
56 userTableName = args[0];
57 }
58 byte[] family = Bytes.toBytes("MY_CF");
59 if (args != null && args.length > 1 && StringUtils.isNotEmpty(args[1])) {
60 family = Bytes.toBytes(args[1]);
61 }
62 LOG.info("Table '{}', column family '{}'", userTableName, Bytes.toString(family));
63
64 ConfigurationExample example = new ConfigurationExample();
65
66
67
68
69
70
71
72
73
74
75
76 example.doWork(userTableName, family, new HBaseOmidClientConfiguration());
77
78
79
80
81
82
83
84
85
86 HBaseOmidClientConfiguration omidClientConfiguration = new HBaseOmidClientConfiguration();
87 omidClientConfiguration.setConnectionType(DIRECT);
88 omidClientConfiguration.setConnectionString("localhost:54758");
89 omidClientConfiguration.setRetryDelayInMs(3000);
90
91 example.doWork(userTableName, family, omidClientConfiguration);
92 }
93
94 private void doWork(String userTableName, byte[] family, HBaseOmidClientConfiguration configuration)
95 throws IOException, RollbackException, InterruptedException {
96
97 byte[] exampleRow1 = Bytes.toBytes("EXAMPLE_ROW1");
98 byte[] exampleRow2 = Bytes.toBytes("EXAMPLE_ROW2");
99 byte[] qualifier = Bytes.toBytes("MY_Q");
100 byte[] dataValue1 = Bytes.toBytes("val1");
101 byte[] dataValue2 = Bytes.toBytes("val2");
102
103 LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
104 try (TransactionManager tm = HBaseTransactionManager.newInstance(configuration);
105 Connection conn = ConnectionFactory.createConnection();
106 TTable txTable = new TTable(conn, userTableName))
107 {
108 for (int i = 0; i < 100; i++) {
109 Transaction tx = tm.begin();
110 LOG.info("Transaction #{} {} STARTED", i, tx);
111
112 Put row1 = new Put(exampleRow1);
113 row1.addColumn(family, qualifier, dataValue1);
114 txTable.put(tx, row1);
115 LOG.info("Transaction {} trying to write a new value in [TABLE:ROW/CF/Q] => {}:{}/{}/{} = {} ",
116 tx, userTableName, Bytes.toString(exampleRow1), Bytes.toString(family),
117 Bytes.toString(qualifier), Bytes.toString(dataValue1));
118
119 Put row2 = new Put(exampleRow2);
120 row2.addColumn(family, qualifier, dataValue2);
121 txTable.put(tx, row2);
122 LOG.info("Transaction {} trying to write a new value in [TABLE:ROW/CF/Q] => {}:{}/{}/{} = {} ",
123 tx, userTableName, Bytes.toString(exampleRow2), Bytes.toString(family),
124 Bytes.toString(qualifier), Bytes.toString(dataValue2));
125
126 tm.commit(tx);
127 LOG.info("Transaction #{} {} COMMITTED", i, tx);
128 }
129 }
130
131 }
132
133 }