1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.testng.Assert.assertEquals;
21 import static org.testng.Assert.assertTrue;
22
23 import org.apache.hadoop.hbase.client.Get;
24 import org.apache.hadoop.hbase.client.Put;
25 import org.apache.hadoop.hbase.client.Result;
26 import org.apache.hadoop.hbase.client.ResultScanner;
27 import org.apache.hadoop.hbase.client.Scan;
28 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
29 import org.apache.hadoop.hbase.filter.CompareFilter;
30 import org.apache.hadoop.hbase.filter.FilterList;
31 import org.apache.hadoop.hbase.filter.RowFilter;
32 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.testng.Assert;
37 import org.testng.ITestContext;
38 import org.testng.annotations.Test;
39
40 @Test(groups = "sharedHBase")
41 public class TestUpdateScan extends OmidTestBase {
42 private static final Logger LOG = LoggerFactory.getLogger(TestUpdateScan.class);
43
44 private static final String TEST_COL = "value";
45 private static final String TEST_COL_2 = "col_2";
46
47 @Test(timeOut = 10_000)
48 public void testGet(ITestContext context) throws Exception {
49 try {
50 TransactionManager tm = newTransactionManager(context);
51 TTable table = new TTable(connection, TEST_TABLE);
52 Transaction t = tm.begin();
53 int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
54 for (int i = 0; i < lInts.length; i++) {
55 byte[] data = Bytes.toBytes(lInts[i]);
56 Put put = new Put(data);
57 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
58 table.put(t, put);
59 }
60 int startKeyValue = lInts[3];
61 int stopKeyValue = lInts[3];
62 byte[] startKey = Bytes.toBytes(startKeyValue);
63 byte[] stopKey = Bytes.toBytes(stopKeyValue);
64 Get g = new Get(startKey);
65 Result r = table.get(t, g);
66 if (!r.isEmpty()) {
67 int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
68 LOG.info("Result:" + tmp);
69 assertTrue(tmp == startKeyValue, "Bad value, should be " + startKeyValue + " but is " + tmp);
70 } else {
71 Assert.fail("Bad result");
72 }
73 tm.commit(t);
74
75 Scan s = new Scan(startKey);
76 CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL;
77 RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey));
78 boolean startInclusive = true;
79 if (!startInclusive) {
80 FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
81 filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryPrefixComparator(startKey)));
82 filters.addFilter(new WhileMatchFilter(toFilter));
83 s.setFilter(filters);
84 } else {
85 s.setFilter(new WhileMatchFilter(toFilter));
86 }
87 t = tm.begin();
88 ResultScanner res = table.getScanner(t, s);
89 Result rr;
90 int count = 0;
91 while ((rr = res.next()) != null) {
92 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
93 LOG.info("Result: " + iTmp);
94 count++;
95 }
96 assertEquals(count, 1, "Count is wrong");
97 LOG.info("Rows found " + count);
98 tm.commit(t);
99 table.close();
100 } catch (Exception e) {
101 LOG.error("Exception in test", e);
102 }
103 }
104
105 @Test(timeOut = 10_000)
106 public void testScan(ITestContext context) throws Exception {
107
108 try (TTable table = new TTable(connection, TEST_TABLE)) {
109 TransactionManager tm = newTransactionManager(context);
110 Transaction t = tm.begin();
111 int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
112 for (int lInt : lInts) {
113 byte[] data = Bytes.toBytes(lInt);
114 Put put = new Put(data);
115 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
116 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data);
117 table.put(t, put);
118 }
119
120 Scan s = new Scan();
121
122
123 s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL));
124 s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2));
125 ResultScanner res = table.getScanner(t, s);
126 Result rr;
127 int count = 0;
128 while ((rr = res.next()) != null) {
129 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
130 LOG.info("Result: " + iTmp);
131 count++;
132 }
133 assertTrue(count == lInts.length, "Count should be " + lInts.length + " but is " + count);
134 LOG.info("Rows found " + count);
135
136 tm.commit(t);
137
138 t = tm.begin();
139 res = table.getScanner(t, s);
140 count = 0;
141 while ((rr = res.next()) != null) {
142 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
143 LOG.info("Result: " + iTmp);
144 count++;
145 }
146 assertTrue(count == lInts.length, "Count should be " + lInts.length + " but is " + count);
147 LOG.info("Rows found " + count);
148 tm.commit(t);
149 }
150
151 }
152
153 @Test(timeOut = 10_000)
154 public void testScanUncommitted(ITestContext context) throws Exception {
155 try {
156 TransactionManager tm = newTransactionManager(context);
157 TTable table = new TTable(connection, TEST_TABLE);
158 Transaction t = tm.begin();
159 int[] lIntsA = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
160 for (int aLIntsA : lIntsA) {
161 byte[] data = Bytes.toBytes(aLIntsA);
162 Put put = new Put(data);
163 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
164 table.put(t, put);
165 }
166 tm.commit(t);
167
168 Transaction tu = tm.begin();
169 int[] lIntsB = new int[]{105, 24, 4342, 32, 7, 3, 30, 40};
170 for (int aLIntsB : lIntsB) {
171 byte[] data = Bytes.toBytes(aLIntsB);
172 Put put = new Put(data);
173 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
174 table.put(tu, put);
175 }
176
177 t = tm.begin();
178 int[] lIntsC = new int[]{109, 224, 242, 2, 16, 59, 23, 26};
179 for (int aLIntsC : lIntsC) {
180 byte[] data = Bytes.toBytes(aLIntsC);
181 Put put = new Put(data);
182 put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
183 table.put(t, put);
184 }
185 tm.commit(t);
186
187 t = tm.begin();
188 Scan s = new Scan();
189 ResultScanner res = table.getScanner(t, s);
190 Result rr;
191 int count = 0;
192
193 while ((rr = res.next()) != null) {
194 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
195 LOG.info("Result: " + iTmp);
196 count++;
197 }
198 assertTrue(count == lIntsA.length + lIntsC.length,
199 "Count should be " + (lIntsA.length * lIntsC.length) + " but is " + count);
200 LOG.info("Rows found " + count);
201 tm.commit(t);
202 table.close();
203 } catch (Exception e) {
204 LOG.error("Exception in test", e);
205 }
206 }
207
208 }