View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.hadoop.hbase.client.Get;
21  import org.apache.hadoop.hbase.client.Put;
22  import org.apache.hadoop.hbase.client.Result;
23  import org.apache.hadoop.hbase.client.ResultScanner;
24  import org.apache.hadoop.hbase.client.Scan;
25  import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
26  import org.apache.hadoop.hbase.filter.CompareFilter;
27  import org.apache.hadoop.hbase.filter.FilterList;
28  import org.apache.hadoop.hbase.filter.RowFilter;
29  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  import org.testng.Assert;
34  import org.testng.ITestContext;
35  import org.testng.annotations.Test;
36  
37  import static org.testng.Assert.assertEquals;
38  import static org.testng.Assert.assertTrue;
39  
40  @Test(groups = "sharedHBase")
41  public class TestUpdateScan extends OmidTestBase {
42      private static final Logger LOG = LoggerFactory.getLogger(TestUpdateScan.class);
43  
44      private static final String TEST_COL = "value";
45      private static final String TEST_COL_2 = "col_2";
46  
47      @Test(timeOut = 10_000)
48      public void testGet(ITestContext context) throws Exception {
49          try {
50              TransactionManager tm = newTransactionManager(context);
51              TTable table = new TTable(hbaseConf, TEST_TABLE);
52              Transaction t = tm.begin();
53              int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
54              for (int i = 0; i < lInts.length; i++) {
55                  byte[] data = Bytes.toBytes(lInts[i]);
56                  Put put = new Put(data);
57                  put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
58                  table.put(t, put);
59              }
60              int startKeyValue = lInts[3];
61              int stopKeyValue = lInts[3];
62              byte[] startKey = Bytes.toBytes(startKeyValue);
63              byte[] stopKey = Bytes.toBytes(stopKeyValue);
64              Get g = new Get(startKey);
65              Result r = table.get(t, g);
66              if (!r.isEmpty()) {
67                  int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
68                  LOG.info("Result:" + tmp);
69                  assertTrue(tmp == startKeyValue, "Bad value, should be " + startKeyValue + " but is " + tmp);
70              } else {
71                  Assert.fail("Bad result");
72              }
73              tm.commit(t);
74  
75              Scan s = new Scan(startKey);
76              CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL;
77              RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey));
78              boolean startInclusive = true;
79              if (!startInclusive) {
80                  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
81                  filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryPrefixComparator(startKey)));
82                  filters.addFilter(new WhileMatchFilter(toFilter));
83                  s.setFilter(filters);
84              } else {
85                  s.setFilter(new WhileMatchFilter(toFilter));
86              }
87              t = tm.begin();
88              ResultScanner res = table.getScanner(t, s);
89              Result rr;
90              int count = 0;
91              while ((rr = res.next()) != null) {
92                  int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
93                  LOG.info("Result: " + iTmp);
94                  count++;
95              }
96              assertEquals(count, 1, "Count is wrong");
97              LOG.info("Rows found " + count);
98              tm.commit(t);
99              table.close();
100         } catch (Exception e) {
101             LOG.error("Exception in test", e);
102         }
103     }
104 
105     @Test(timeOut = 10_000)
106     public void testScan(ITestContext context) throws Exception {
107 
108         try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
109             TransactionManager tm = newTransactionManager(context);
110             Transaction t = tm.begin();
111             int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
112             for (int lInt : lInts) {
113                 byte[] data = Bytes.toBytes(lInt);
114                 Put put = new Put(data);
115                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
116                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data);
117                 table.put(t, put);
118             }
119 
120             Scan s = new Scan();
121             // Adding two columns to the scanner should not throw a
122             // ConcurrentModificationException when getting the scanner
123             s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL));
124             s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2));
125             ResultScanner res = table.getScanner(t, s);
126             Result rr;
127             int count = 0;
128             while ((rr = res.next()) != null) {
129                 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
130                 LOG.info("Result: " + iTmp);
131                 count++;
132             }
133             assertTrue(count == lInts.length, "Count should be " + lInts.length + " but is " + count);
134             LOG.info("Rows found " + count);
135 
136             tm.commit(t);
137 
138             t = tm.begin();
139             res = table.getScanner(t, s);
140             count = 0;
141             while ((rr = res.next()) != null) {
142                 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
143                 LOG.info("Result: " + iTmp);
144                 count++;
145             }
146             assertTrue(count == lInts.length, "Count should be " + lInts.length + " but is " + count);
147             LOG.info("Rows found " + count);
148             tm.commit(t);
149         }
150 
151     }
152 
153     @Test(timeOut = 10_000)
154     public void testScanUncommitted(ITestContext context) throws Exception {
155         try {
156             TransactionManager tm = newTransactionManager(context);
157             TTable table = new TTable(hbaseConf, TEST_TABLE);
158             Transaction t = tm.begin();
159             int[] lIntsA = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
160             for (int aLIntsA : lIntsA) {
161                 byte[] data = Bytes.toBytes(aLIntsA);
162                 Put put = new Put(data);
163                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
164                 table.put(t, put);
165             }
166             tm.commit(t);
167 
168             Transaction tu = tm.begin();
169             int[] lIntsB = new int[]{105, 24, 4342, 32, 7, 3, 30, 40};
170             for (int aLIntsB : lIntsB) {
171                 byte[] data = Bytes.toBytes(aLIntsB);
172                 Put put = new Put(data);
173                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
174                 table.put(tu, put);
175             }
176 
177             t = tm.begin();
178             int[] lIntsC = new int[]{109, 224, 242, 2, 16, 59, 23, 26};
179             for (int aLIntsC : lIntsC) {
180                 byte[] data = Bytes.toBytes(aLIntsC);
181                 Put put = new Put(data);
182                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
183                 table.put(t, put);
184             }
185             tm.commit(t);
186 
187             t = tm.begin();
188             Scan s = new Scan();
189             ResultScanner res = table.getScanner(t, s);
190             Result rr;
191             int count = 0;
192 
193             while ((rr = res.next()) != null) {
194                 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
195                 LOG.info("Result: " + iTmp);
196                 count++;
197             }
198             assertTrue(count == lIntsA.length + lIntsC.length,
199                        "Count should be " + (lIntsA.length * lIntsC.length) + " but is " + count);
200             LOG.info("Rows found " + count);
201             tm.commit(t);
202             table.close();
203         } catch (Exception e) {
204             LOG.error("Exception in test", e);
205         }
206     }
207 
208 }