1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.testng.Assert.assertTrue;
21
22 import org.apache.hadoop.hbase.client.Put;
23 import org.apache.hadoop.hbase.client.Result;
24 import org.apache.hadoop.hbase.client.ResultScanner;
25 import org.apache.hadoop.hbase.client.Scan;
26 import org.apache.hadoop.hbase.util.Bytes;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.testng.ITestContext;
30 import org.testng.annotations.Test;
31
32 @Test(groups = "sharedHBase")
33 public class TestSingleColumnFamily extends OmidTestBase {
34
35 private static final Logger LOG = LoggerFactory.getLogger(TestSingleColumnFamily.class);
36
37 @Test(timeOut = 10_000)
38 public void testSingleColumnFamily(ITestContext context) throws Exception {
39 TransactionManager tm = newTransactionManager(context);
40 TTable table1 = new TTable(connection, TEST_TABLE);
41 int num = 10;
42 Transaction t = tm.begin();
43 for (int j = 0; j < num; j++) {
44 byte[] data = Bytes.toBytes(j);
45 Put put = new Put(data);
46 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value1"), data);
47 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2"), data);
48 table1.put(t, put);
49 }
50
51
52 Scan s = new Scan();
53 ResultScanner res = table1.getScanner(t, s);
54 Result rr;
55 int count = 0;
56 while ((rr = res.next()) != null) {
57 int tmp1 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value1")));
58 int tmp2 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2")));
59 LOG.info("RES:" + tmp1 + ";" + tmp2);
60 count++;
61 }
62 assertTrue(num == count, "Can't see puts. I should see " + num + " but I see " + count);
63
64 tm.commit(t);
65 t = tm.begin();
66
67 for (int j = 0; j < num / 2; j++) {
68 byte[] data = Bytes.toBytes(j);
69 byte[] ndata = Bytes.toBytes(j * 10);
70 Put put = new Put(data);
71 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2"), ndata);
72 table1.put(t, put);
73 }
74 tm.commit(t);
75 t = tm.begin();
76 s = new Scan();
77 res = table1.getScanner(t, s);
78 count = 0;
79 int modified = 0, notmodified = 0;
80 while ((rr = res.next()) != null) {
81 int tmp1 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value1")));
82 int tmp2 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2")));
83 LOG.info("RES:" + tmp1 + ";" + tmp2);
84 count++;
85
86 if (tmp2 == Bytes.toInt(rr.getRow()) * 10) {
87 modified++;
88 } else {
89 notmodified++;
90 }
91 if (count == 8) {
92 LOG.debug("stop");
93 }
94 }
95 assertTrue(num == count, "Can't see puts. I should see " + num + " but I see " + count);
96 assertTrue(modified == notmodified && notmodified == (num / 2),
97 "Half of rows should equal row id, half not (" + modified + ", " + notmodified + ")");
98
99 tm.commit(t);
100 LOG.info("End commiting");
101 table1.close();
102 }
103 }