View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.hadoop.hbase.HColumnDescriptor;
21  import org.apache.hadoop.hbase.HTableDescriptor;
22  import org.apache.hadoop.hbase.TableName;
23  import org.apache.hadoop.hbase.client.Delete;
24  import org.apache.hadoop.hbase.client.Get;
25  import org.apache.hadoop.hbase.client.HBaseAdmin;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.client.ResultScanner;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  import org.testng.ITestContext;
34  import org.testng.annotations.Test;
35  
36  import static org.testng.Assert.assertEquals;
37  import static org.testng.Assert.assertTrue;
38  import static org.testng.Assert.fail;
39  
40  @Test(groups = "sharedHBase")
41  public class TestTransactionConflict extends OmidTestBase {
42  
43      private static final Logger LOG = LoggerFactory.getLogger(TestTransactionConflict.class);
44  
45      @Test
46      public void runTestWriteWriteConflict(ITestContext context) throws Exception {
47          TransactionManager tm = newTransactionManager(context);
48          TTable tt = new TTable(hbaseConf, TEST_TABLE);
49  
50          Transaction t1 = tm.begin();
51          LOG.info("Transaction created " + t1);
52  
53          Transaction t2 = tm.begin();
54          LOG.info("Transaction created" + t2);
55  
56          byte[] row = Bytes.toBytes("test-simple");
57          byte[] fam = Bytes.toBytes(TEST_FAMILY);
58          byte[] col = Bytes.toBytes("testdata");
59          byte[] data1 = Bytes.toBytes("testWrite-1");
60          byte[] data2 = Bytes.toBytes("testWrite-2");
61  
62          Put p = new Put(row);
63          p.add(fam, col, data1);
64          tt.put(t1, p);
65  
66          Put p2 = new Put(row);
67          p2.add(fam, col, data2);
68          tt.put(t2, p2);
69  
70          tm.commit(t2);
71  
72          try {
73              tm.commit(t1);
74              fail("Transaction should not commit successfully");
75          } catch (RollbackException e) {
76          }
77      }
78  
79      @Test
80      public void runTestMultiTableConflict(ITestContext context) throws Exception {
81          TransactionManager tm = newTransactionManager(context);
82          TTable tt = new TTable(hbaseConf, TEST_TABLE);
83          String table2 = TEST_TABLE + 2;
84          TableName table2Name = TableName.valueOf(table2);
85  
86          HBaseAdmin admin = new HBaseAdmin(hbaseConf);
87  
88          if (!admin.tableExists(table2)) {
89              HTableDescriptor desc = new HTableDescriptor(table2Name);
90              HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
91              datafam.setMaxVersions(Integer.MAX_VALUE);
92              desc.addFamily(datafam);
93  
94              admin.createTable(desc);
95          }
96  
97          if (admin.isTableDisabled(table2)) {
98              admin.enableTable(table2);
99          }
100         admin.close();
101 
102         TTable tt2 = new TTable(hbaseConf, table2);
103 
104         Transaction t1 = tm.begin();
105         LOG.info("Transaction created " + t1);
106 
107         Transaction t2 = tm.begin();
108         LOG.info("Transaction created" + t2);
109 
110         byte[] row = Bytes.toBytes("test-simple");
111         byte[] row2 = Bytes.toBytes("test-simple2");
112         byte[] fam = Bytes.toBytes(TEST_FAMILY);
113         byte[] col = Bytes.toBytes("testdata");
114         byte[] data1 = Bytes.toBytes("testWrite-1");
115         byte[] data2 = Bytes.toBytes("testWrite-2");
116 
117         Put p = new Put(row);
118         p.add(fam, col, data1);
119         tt.put(t1, p);
120         tt2.put(t1, p);
121 
122         Put p2 = new Put(row);
123         p2.add(fam, col, data2);
124         tt.put(t2, p2);
125         p2 = new Put(row2);
126         p2.add(fam, col, data2);
127         tt2.put(t2, p2);
128 
129         tm.commit(t2);
130 
131         boolean aborted = false;
132         try {
133             tm.commit(t1);
134             fail("Transaction commited successfully");
135         } catch (RollbackException e) {
136             aborted = true;
137         }
138         assertTrue(aborted, "Transaction didn't raise exception");
139 
140         ResultScanner rs = tt2.getHTable().getScanner(fam, col);
141 
142         int count = 0;
143         Result r;
144         while ((r = rs.next()) != null) {
145             count += r.size();
146         }
147         assertEquals(count, 1, "Should have cell");
148     }
149 
150     @Test
151     public void runTestCleanupAfterConflict(ITestContext context) throws Exception {
152         TransactionManager tm = newTransactionManager(context);
153         TTable tt = new TTable(hbaseConf, TEST_TABLE);
154 
155         Transaction t1 = tm.begin();
156         LOG.info("Transaction created " + t1);
157 
158         Transaction t2 = tm.begin();
159         LOG.info("Transaction created" + t2);
160 
161         byte[] row = Bytes.toBytes("test-simple");
162         byte[] fam = Bytes.toBytes(TEST_FAMILY);
163         byte[] col = Bytes.toBytes("testdata");
164         byte[] data1 = Bytes.toBytes("testWrite-1");
165         byte[] data2 = Bytes.toBytes("testWrite-2");
166 
167         Put p = new Put(row);
168         p.add(fam, col, data1);
169         tt.put(t1, p);
170 
171         Get g = new Get(row).setMaxVersions();
172         g.addColumn(fam, col);
173         Result r = tt.getHTable().get(g);
174         assertEquals(r.size(), 1, "Unexpected size for read.");
175         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
176                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
177 
178         Put p2 = new Put(row);
179         p2.add(fam, col, data2);
180         tt.put(t2, p2);
181 
182         r = tt.getHTable().get(g);
183         assertEquals(r.size(), 2, "Unexpected size for read.");
184         r = tt.get(t2, g);
185         assertEquals(r.size(),1, "Unexpected size for read.");
186         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
187                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
188 
189         tm.commit(t1);
190 
191         boolean aborted = false;
192         try {
193             tm.commit(t2);
194             fail("Transaction commited successfully");
195         } catch (RollbackException e) {
196             aborted = true;
197         }
198         assertTrue(aborted, "Transaction didn't raise exception");
199 
200         r = tt.getHTable().get(g);
201         assertEquals(r.size(), 1, "Unexpected size for read.");
202         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
203                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
204     }
205 
206     @Test
207     public void testCleanupWithDeleteRow(ITestContext context) throws Exception {
208 
209         TransactionManager tm = newTransactionManager(context);
210         TTable tt = new TTable(hbaseConf, TEST_TABLE);
211 
212         Transaction t1 = tm.begin();
213         LOG.info("Transaction created " + t1);
214 
215         int rowcount = 10;
216         int count = 0;
217 
218         byte[] fam = Bytes.toBytes(TEST_FAMILY);
219         byte[] col = Bytes.toBytes("testdata");
220         byte[] data1 = Bytes.toBytes("testWrite-1");
221         byte[] data2 = Bytes.toBytes("testWrite-2");
222 
223         byte[] modrow = Bytes.toBytes("test-del" + 3);
224         for (int i = 0; i < rowcount; i++) {
225             byte[] row = Bytes.toBytes("test-del" + i);
226 
227             Put p = new Put(row);
228             p.add(fam, col, data1);
229             tt.put(t1, p);
230         }
231         tm.commit(t1);
232 
233         Transaction t2 = tm.begin();
234         LOG.info("Transaction created " + t2);
235         Delete d = new Delete(modrow);
236         tt.delete(t2, d);
237 
238         ResultScanner rs = tt.getScanner(t2, new Scan());
239         Result r = rs.next();
240         count = 0;
241         while (r != null) {
242             count++;
243             LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
244             r = rs.next();
245         }
246         assertEquals(count, rowcount - 1, "Wrong count");
247 
248         Transaction t3 = tm.begin();
249         LOG.info("Transaction created " + t3);
250         Put p = new Put(modrow);
251         p.add(fam, col, data2);
252         tt.put(t3, p);
253 
254         tm.commit(t3);
255 
256         boolean aborted = false;
257         try {
258             tm.commit(t2);
259             fail("Didn't abort");
260         } catch (RollbackException e) {
261             aborted = true;
262         }
263         assertTrue(aborted, "Didn't raise exception");
264 
265         Transaction tscan = tm.begin();
266         rs = tt.getScanner(tscan, new Scan());
267         r = rs.next();
268         count = 0;
269         while (r != null) {
270             count++;
271             r = rs.next();
272         }
273         assertEquals(count, rowcount, "Wrong count");
274 
275     }
276 
277     @Test
278     public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
279         TransactionManager tm = newTransactionManager(context);
280         TTable tt = new TTable(hbaseConf, TEST_TABLE);
281 
282         Transaction t1 = tm.begin();
283         Transaction t2 = tm.begin();
284         LOG.info("Transactions created " + t1 + " " + t2);
285 
286         byte[] row = Bytes.toBytes("row");
287         byte[] fam = Bytes.toBytes(TEST_FAMILY);
288         byte[] col1 = Bytes.toBytes("testdata1");
289         byte[] col2 = Bytes.toBytes("testdata2");
290         byte[] data = Bytes.toBytes("testWrite-1");
291 
292         Put p2 = new Put(row);
293         p2.add(fam, col1, data);
294         tt.put(t2, p2);
295         tm.commit(t2);
296 
297         Put p1 = new Put(row);
298         p1.add(fam, col2, data);
299         tt.put(t1, p1);
300         tm.commit(t1);
301     }
302 
303 }