View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.hadoop.hbase.client.Get;
21  import org.apache.hadoop.hbase.client.Put;
22  import org.apache.hadoop.hbase.client.Result;
23  import org.apache.hadoop.hbase.client.ResultScanner;
24  import org.apache.hadoop.hbase.client.Scan;
25  import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
26  import org.apache.hadoop.hbase.filter.CompareFilter;
27  import org.apache.hadoop.hbase.filter.FilterList;
28  import org.apache.hadoop.hbase.filter.RowFilter;
29  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  import org.testng.Assert;
34  import org.testng.ITestContext;
35  import org.testng.annotations.Test;
36  
37  import static org.testng.Assert.assertEquals;
38  import static org.testng.Assert.assertTrue;
39  
40  @Test(groups = "sharedHBase")
41  public class TestUpdateScan extends OmidTestBase {
42      private static final Logger LOG = LoggerFactory.getLogger(TestUpdateScan.class);
43  
44      private static final String TEST_COL = "value";
45      private static final String TEST_COL_2 = "col_2";
46  
47      @Test
48      public void testGet(ITestContext context) throws Exception {
49          try {
50              TransactionManager tm = newTransactionManager(context);
51              TTable table = new TTable(hbaseConf, TEST_TABLE);
52              Transaction t = tm.begin();
53              int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
54              for (int i = 0; i < lInts.length; i++) {
55                  byte[] data = Bytes.toBytes(lInts[i]);
56                  Put put = new Put(data);
57                  put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
58                  table.put(t, put);
59              }
60              int startKeyValue = lInts[3];
61              int stopKeyValue = lInts[3];
62              byte[] startKey = Bytes.toBytes(startKeyValue);
63              byte[] stopKey = Bytes.toBytes(stopKeyValue);
64              Get g = new Get(startKey);
65              Result r = table.get(t, g);
66              if (!r.isEmpty()) {
67                  int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY),
68                          Bytes.toBytes(TEST_COL)));
69                  LOG.info("Result:" + tmp);
70                  assertTrue(tmp == startKeyValue, "Bad value, should be " + startKeyValue + " but is " + tmp);
71              } else {
72                  Assert.fail("Bad result");
73              }
74              tm.commit(t);
75  
76              Scan s = new Scan(startKey);
77              CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL;
78              RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey));
79              boolean startInclusive = true;
80              if (!startInclusive) {
81                  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
82                  filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER,
83                          new BinaryPrefixComparator(startKey)));
84                  filters.addFilter(new WhileMatchFilter(toFilter));
85                  s.setFilter(filters);
86              } else {
87                  s.setFilter(new WhileMatchFilter(toFilter));
88              }
89              t = tm.begin();
90              ResultScanner res = table.getScanner(t, s);
91              Result rr;
92              int count = 0;
93              while ((rr = res.next()) != null) {
94                  int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
95                          Bytes.toBytes(TEST_COL)));
96                  LOG.info("Result: " + iTmp);
97                  count++;
98              }
99              assertEquals(count, 1, "Count is wrong");
100             LOG.info("Rows found " + count);
101             tm.commit(t);
102             table.close();
103         } catch (Exception e) {
104             LOG.error("Exception in test", e);
105         }
106     }
107 
108     @Test
109     public void testScan(ITestContext context) throws Exception {
110 
111         try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
112             TransactionManager tm = newTransactionManager(context);
113             Transaction t = tm.begin();
114             int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
115             for (int lInt : lInts) {
116                 byte[] data = Bytes.toBytes(lInt);
117                 Put put = new Put(data);
118                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
119                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data);
120                 table.put(t, put);
121             }
122 
123             Scan s = new Scan();
124             // Adding two columns to the scanner should not throw a
125             // ConcurrentModificationException when getting the scanner
126             s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL));
127             s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2));
128             ResultScanner res = table.getScanner(t, s);
129             Result rr;
130             int count = 0;
131             while ((rr = res.next()) != null) {
132                 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
133                         Bytes.toBytes(TEST_COL)));
134                 LOG.info("Result: " + iTmp);
135                 count++;
136             }
137             assertTrue(count == lInts.length, "Count should be " + lInts.length + " but is " + count);
138             LOG.info("Rows found " + count);
139 
140             tm.commit(t);
141 
142             t = tm.begin();
143             res = table.getScanner(t, s);
144             count = 0;
145             while ((rr = res.next()) != null) {
146                 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
147                         Bytes.toBytes(TEST_COL)));
148                 LOG.info("Result: " + iTmp);
149                 count++;
150             }
151             assertTrue(count == lInts.length, "Count should be " + lInts.length + " but is " + count);
152             LOG.info("Rows found " + count);
153             tm.commit(t);
154         }
155 
156     }
157 
158 
159     @Test
160     public void testScanUncommitted(ITestContext context) throws Exception {
161         try {
162             TransactionManager tm = newTransactionManager(context);
163             TTable table = new TTable(hbaseConf, TEST_TABLE);
164             Transaction t = tm.begin();
165             int[] lIntsA = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
166             for (int aLIntsA : lIntsA) {
167                 byte[] data = Bytes.toBytes(aLIntsA);
168                 Put put = new Put(data);
169                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
170                 table.put(t, put);
171             }
172             tm.commit(t);
173 
174             Transaction tu = tm.begin();
175             int[] lIntsB = new int[]{105, 24, 4342, 32, 7, 3, 30, 40};
176             for (int aLIntsB : lIntsB) {
177                 byte[] data = Bytes.toBytes(aLIntsB);
178                 Put put = new Put(data);
179                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
180                 table.put(tu, put);
181             }
182 
183             t = tm.begin();
184             int[] lIntsC = new int[]{109, 224, 242, 2, 16, 59, 23, 26};
185             for (int aLIntsC : lIntsC) {
186                 byte[] data = Bytes.toBytes(aLIntsC);
187                 Put put = new Put(data);
188                 put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
189                 table.put(t, put);
190             }
191             tm.commit(t);
192 
193             t = tm.begin();
194             Scan s = new Scan();
195             ResultScanner res = table.getScanner(t, s);
196             Result rr;
197             int count = 0;
198 
199             while ((rr = res.next()) != null) {
200                 int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
201                         Bytes.toBytes(TEST_COL)));
202                 LOG.info("Result: " + iTmp);
203                 count++;
204             }
205             assertTrue(count == lIntsA.length + lIntsC.length,
206                        "Count should be " + (lIntsA.length * lIntsC.length) + " but is " + count);
207             LOG.info("Rows found " + count);
208             tm.commit(t);
209             table.close();
210         } catch (Exception e) {
211             LOG.error("Exception in test", e);
212         }
213     }
214 }