1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Mockito.doAnswer;
22 import static org.mockito.Mockito.spy;
23 import static org.testng.Assert.assertEquals;
24 import static org.testng.Assert.assertNull;
25
26 import org.apache.hadoop.hbase.client.Get;
27 import org.apache.hadoop.hbase.client.Put;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.ResultScanner;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.apache.hadoop.hbase.filter.BinaryComparator;
32 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
33 import org.apache.hadoop.hbase.filter.CompareFilter;
34 import org.apache.hadoop.hbase.filter.Filter;
35 import org.apache.hadoop.hbase.filter.ValueFilter;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.omid.committable.CommitTable;
38 import org.apache.omid.metrics.NullMetricsProvider;
39 import org.mockito.invocation.InvocationOnMock;
40 import org.mockito.stubbing.Answer;
41 import org.testng.ITestContext;
42 import org.testng.annotations.Test;
43
44 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
45 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
46
47
48
49
50 @Test(groups = "sharedHBase")
51 public class TestFilters extends OmidTestBase {
52
53 byte[] family = Bytes.toBytes(TEST_FAMILY);
54 private byte[] row1 = Bytes.toBytes("row1");
55 private byte[] row2 = Bytes.toBytes("row2");
56 private byte[] row3 = Bytes.toBytes("row3");
57 private byte[] prefix = Bytes.toBytes("foo");
58 private byte[] col1 = Bytes.toBytes("foobar");
59 private byte[] col2 = Bytes.toBytes("boofar");
60
61 @Test(timeOut = 60_000)
62 public void testGetWithColumnPrefixFilter(ITestContext context) throws Exception {
63 testGet(context, new ColumnPrefixFilter(prefix));
64 }
65
66 @Test(timeOut = 60_000)
67 public void testGetWithValueFilter(ITestContext context) throws Exception {
68 testGet(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
69 }
70
71 private void testGet(ITestContext context, Filter f) throws Exception {
72
73 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
74
75 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
76 hbaseOmidClientConf.setConnectionString("localhost:1234");
77 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
78
79 TTable table = new TTable(connection, TEST_TABLE);
80 PostCommitActions syncPostCommitter = spy(
81 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
82 AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
83 .commitTableClient(commitTableClient)
84 .commitTableWriter(getCommitTable(context).getWriter())
85 .postCommitter(syncPostCommitter)
86 .build();
87
88 writeRows(table, tm, syncPostCommitter);
89
90 Transaction t = tm.begin();
91 Get g = new Get(row1);
92 g.setFilter(f);
93
94 Result r = table.get(t, g);
95 assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
96 assertEquals(r.getColumnCells(family, col2).size(), 0 , "shouldn't exist in result");
97
98 g = new Get(row2);
99 g.setFilter(f);
100 r = table.get(t, g);
101 assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
102 assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
103
104 g = new Get(row3);
105 g.setFilter(f);
106 r = table.get(t, g);
107 assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
108
109 }
110
111 @Test(timeOut = 60_000)
112 public void testScanWithColumnPrefixFilter(ITestContext context) throws Exception {
113 testScan(context, new ColumnPrefixFilter(prefix));
114 }
115
116 @Test(timeOut = 60_000)
117 public void testScanWithValueFilter(ITestContext context) throws Exception {
118 testScan(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
119 }
120
121 private void testScan(ITestContext context, Filter f) throws Exception {
122
123 CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
124
125 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
126 hbaseOmidClientConf.getOmidClientConfiguration().setConnectionString("localhost:1234");
127 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
128 TTable table = new TTable(connection, TEST_TABLE);
129 PostCommitActions syncPostCommitter = spy(
130 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
131 AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
132 .commitTableClient(commitTableClient)
133 .commitTableWriter(getCommitTable(context).getWriter())
134 .postCommitter(syncPostCommitter)
135 .build();
136
137 writeRows(table, tm, syncPostCommitter);
138
139 Transaction t = tm.begin();
140 Scan s = new Scan().setFilter(f);
141
142 ResultScanner rs = table.getScanner(t, s);
143
144 Result r = rs.next();
145 assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
146 assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
147
148 r = rs.next();
149 assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
150 assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
151
152 r = rs.next();
153 assertNull(r, "Last row shouldn't exist");
154
155 }
156
157 private void writeRows(TTable table, TransactionManager tm, PostCommitActions postCommitter)
158 throws Exception {
159
160 Transaction t = tm.begin();
161 Put p = new Put(row1);
162 p.addColumn(family, col1, col1);
163 p.addColumn(family, col2, col2);
164 table.put(t, p);
165 tm.commit(t);
166
167
168 doAnswer(new Answer<ListenableFuture<Void>>() {
169 public ListenableFuture<Void> answer(InvocationOnMock invocation) {
170
171 return SettableFuture.create();
172 }
173 }).when(postCommitter).updateShadowCells(any(HBaseTransaction.class));
174
175 t = tm.begin();
176 p = new Put(row2);
177 p.addColumn(family, col1, col1);
178 p.addColumn(family, col2, col2);
179 table.put(t, p);
180 try {
181 tm.commit(t);
182 } catch (TransactionException e) {
183
184 }
185
186
187 t = tm.begin();
188 p = new Put(row3);
189 p.addColumn(family, col2, col2);
190 table.put(t, p);
191 try {
192 tm.commit(t);
193 } catch (TransactionException e) {
194
195 }
196 }
197
198 }