View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.util.concurrent.ListenableFuture;
21  import com.google.common.util.concurrent.SettableFuture;
22  import org.apache.hadoop.hbase.client.Get;
23  import org.apache.hadoop.hbase.client.Put;
24  import org.apache.hadoop.hbase.client.Result;
25  import org.apache.hadoop.hbase.client.ResultScanner;
26  import org.apache.hadoop.hbase.client.Scan;
27  import org.apache.hadoop.hbase.filter.BinaryComparator;
28  import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
29  import org.apache.hadoop.hbase.filter.CompareFilter;
30  import org.apache.hadoop.hbase.filter.Filter;
31  import org.apache.hadoop.hbase.filter.ValueFilter;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.omid.committable.CommitTable;
34  import org.apache.omid.metrics.NullMetricsProvider;
35  import org.mockito.invocation.InvocationOnMock;
36  import org.mockito.stubbing.Answer;
37  import org.testng.ITestContext;
38  import org.testng.annotations.Test;
39  
40  import static org.mockito.Matchers.any;
41  import static org.mockito.Mockito.doAnswer;
42  import static org.mockito.Mockito.spy;
43  import static org.testng.Assert.assertEquals;
44  import static org.testng.Assert.assertNull;
45  
46  /**
47   * Tests to verify that Get and Scan filters still work with transactions tables
48   */
49  @Test(groups = "sharedHBase")
50  public class TestFilters extends OmidTestBase {
51  
52      byte[] family = Bytes.toBytes(TEST_FAMILY);
53      private byte[] row1 = Bytes.toBytes("row1");
54      private byte[] row2 = Bytes.toBytes("row2");
55      private byte[] row3 = Bytes.toBytes("row3");
56      private byte[] prefix = Bytes.toBytes("foo");
57      private byte[] col1 = Bytes.toBytes("foobar");
58      private byte[] col2 = Bytes.toBytes("boofar");
59  
60      @Test(timeOut = 60_000)
61      public void testGetWithColumnPrefixFilter(ITestContext context) throws Exception {
62          testGet(context, new ColumnPrefixFilter(prefix));
63      }
64  
65      @Test(timeOut = 60_000)
66      public void testGetWithValueFilter(ITestContext context) throws Exception {
67          testGet(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
68      }
69  
70      private void testGet(ITestContext context, Filter f) throws Exception {
71  
72          CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
73  
74          HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
75          hbaseOmidClientConf.setConnectionString("localhost:1234");
76          hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
77  
78          TTable table = new TTable(hbaseConf, TEST_TABLE);
79          PostCommitActions syncPostCommitter = spy(
80                  new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
81          AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
82                  .commitTableClient(commitTableClient)
83                  .postCommitter(syncPostCommitter)
84                  .build();
85  
86          writeRows(table, tm, syncPostCommitter);
87  
88          Transaction t = tm.begin();
89          Get g = new Get(row1);
90          g.setFilter(f);
91  
92          Result r = table.get(t, g);
93          assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
94          assertEquals(r.getColumnCells(family, col2).size(), 0 , "shouldn't exist in result");
95  
96          g = new Get(row2);
97          g.setFilter(f);
98          r = table.get(t, g);
99          assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
100         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
101 
102         g = new Get(row3);
103         g.setFilter(f);
104         r = table.get(t, g);
105         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
106 
107     }
108 
109     @Test(timeOut = 60_000)
110     public void testScanWithColumnPrefixFilter(ITestContext context) throws Exception {
111         testScan(context, new ColumnPrefixFilter(prefix));
112     }
113 
114     @Test(timeOut = 60_000)
115     public void testScanWithValueFilter(ITestContext context) throws Exception {
116         testScan(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
117     }
118 
119     private void testScan(ITestContext context, Filter f) throws Exception {
120 
121         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
122 
123         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
124         hbaseOmidClientConf.getOmidClientConfiguration().setConnectionString("localhost:1234");
125         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
126         TTable table = new TTable(hbaseConf, TEST_TABLE);
127         PostCommitActions syncPostCommitter = spy(
128                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
129         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
130                 .commitTableClient(commitTableClient)
131                 .postCommitter(syncPostCommitter)
132                 .build();
133 
134         writeRows(table, tm, syncPostCommitter);
135 
136         Transaction t = tm.begin();
137         Scan s = new Scan().setFilter(f);
138 
139         ResultScanner rs = table.getScanner(t, s);
140 
141         Result r = rs.next();
142         assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
143         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
144 
145         r = rs.next();
146         assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
147         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
148 
149         r = rs.next();
150         assertNull(r, "Last row shouldn't exist");
151 
152     }
153 
154     private void writeRows(TTable table, TransactionManager tm, PostCommitActions postCommitter)
155             throws Exception {
156         // create normal row with both cells
157         Transaction t = tm.begin();
158         Put p = new Put(row1);
159         p.add(family, col1, col1);
160         p.add(family, col2, col2);
161         table.put(t, p);
162         tm.commit(t);
163 
164         // create normal row, but fail to update shadow cells
165         doAnswer(new Answer<ListenableFuture<Void>>() {
166             public ListenableFuture<Void> answer(InvocationOnMock invocation) {
167                 // Do not invoke the real method
168                 return SettableFuture.create();
169             }
170         }).when(postCommitter).updateShadowCells(any(HBaseTransaction.class));
171 
172         t = tm.begin();
173         p = new Put(row2);
174         p.add(family, col1, col1);
175         p.add(family, col2, col2);
176         table.put(t, p);
177         try {
178             tm.commit(t);
179         } catch (TransactionException e) {
180             // Expected, see comment above
181         }
182 
183         // create normal row with only one cell
184         t = tm.begin();
185         p = new Put(row3);
186         p.add(family, col2, col2);
187         table.put(t, p);
188         try {
189             tm.commit(t);
190         } catch (TransactionException e) {
191             // Expected, see comment above
192         }
193     }
194 
195 }