View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.hadoop.hbase.client.Delete;
21  import org.apache.hadoop.hbase.client.Put;
22  import org.apache.hadoop.hbase.client.Result;
23  import org.apache.hadoop.hbase.client.ResultScanner;
24  import org.apache.hadoop.hbase.client.Scan;
25  import org.apache.hadoop.hbase.filter.CompareFilter;
26  import org.apache.hadoop.hbase.filter.Filter;
27  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.slf4j.Logger;
30  import org.testng.ITestContext;
31  import org.testng.annotations.BeforeMethod;
32  import org.testng.annotations.Test;
33  
34  import java.io.IOException;
35  import java.util.Arrays;
36  
37  import static org.slf4j.LoggerFactory.getLogger;
38  import static org.testng.Assert.assertEquals;
39  import static org.testng.Assert.assertNull;
40  import static org.testng.Assert.fail;
41  
42  /**
43   * These tests try to analyze the transactional anomalies described by P. Baillis et al. in
44   * http://arxiv.org/pdf/1302.0309.pdf
45   *
46   * These tests try to model what project Hermitage is trying to do to compare the behavior of different DBMSs on these
47   * anomalies depending on the different isolation levels they offer. For more info on the Hermitage project, please
48   * refer to: https://github.com/ept/hermitage
49   *
50   * Transactional histories have been translated to HBase from the ones done for Postgresql in the Hermitage project:
51   * https://github.com/ept/hermitage/blob/master/postgres.md
52   *
53   * The "repeatable read" Postgresql isolation level is equivalent to "snapshot isolation", so we include the experiments
54   * for that isolation level
55   *
56   * With HBase 0.98 interfaces is not possible to execute updates/deletes based on predicates so the examples here are
57   * not exactly the same as in Postgres
58   */
59  @Test(groups = "sharedHBase")
60  public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
61  
62      private static final Logger LOG = getLogger(TestBaillisAnomaliesWithTXs.class);
63      private static final String TEST_COLUMN = "baillis-col";
64  
65  
66      // Data used in the tests
67      private byte[] famName = Bytes.toBytes(TEST_FAMILY);
68      private byte[] colName = Bytes.toBytes(TEST_COLUMN);
69  
70      private byte[] rowId1 = Bytes.toBytes("row1");
71      private byte[] rowId2 = Bytes.toBytes("row2");
72      private byte[] rowId3 = Bytes.toBytes("row3");
73  
74      private byte[] dataValue1 = Bytes.toBytes(10);
75      private byte[] dataValue2 = Bytes.toBytes(20);
76      private byte[] dataValue3 = Bytes.toBytes(30);
77  
78  
79      @Test
80      public void testSIPreventsPredicateManyPrecedersForReadPredicates(ITestContext context) throws Exception {
81          // TX History for PMP for Read Predicate:
82          // begin; set transaction isolation level repeatable read; -- T1
83          // begin; set transaction isolation level repeatable read; -- T2
84          // select * from test where value = 30; -- T1. Returns nothing
85          // insert into test (id, value) values(3, 30); -- T2
86          // commit; -- T2
87          // select * from test where value % 3 = 0; -- T1. Still returns nothing
88          // commit; -- T1
89  
90          // 0) Start transactions
91          TransactionManager tm = newTransactionManager(context);
92          TTable txTable = new TTable(hbaseConf, TEST_TABLE);
93  
94          Transaction tx1 = tm.begin();
95          Transaction tx2 = tm.begin();
96  
97          // 1) select * from test where value = 30; -- T1. Returns nothing
98          Scan scan = new Scan();
99          Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(30));
100         scan.setFilter(f);
101         ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
102         assertNull(tx1Scanner.next());
103 
104         // 2) insert into test (id, value) values(3, 30); -- T2
105         Put newRow = new Put(rowId3);
106         newRow.add(famName, colName, dataValue3);
107         txTable.put(tx2, newRow);
108 
109         // 3) Commit TX 2
110         tm.commit(tx2);
111 
112         // 4) select * from test where value % 3 = 0; -- T1. Still returns nothing
113         tx1Scanner = txTable.getScanner(tx1, scan);
114         assertNull(tx1Scanner.next());
115 
116         // 5) Commit TX 1
117         tm.commit(tx1);
118     }
119 
120     @Test
121     public void testSIPreventsPredicateManyPrecedersForWritePredicates(ITestContext context) throws Exception {
122         // TX History for PMP for Write Predicate:
123         // begin; set transaction isolation level repeatable read; -- T1
124         // begin; set transaction isolation level repeatable read; -- T2
125         // update test set value = value + 10; -- T1
126         // delete from test where value = 20; -- T2, BLOCKS
127         // commit; -- T1. T2 now prints out "ERROR: could not serialize access due to concurrent update"
128         // abort; -- T2. There's nothing else we can do, this transaction has failed
129 
130         // 0) Start transactions
131         TransactionManager tm = newTransactionManager(context);
132         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
133         Transaction tx1 = tm.begin();
134         Transaction tx2 = tm.begin();
135 
136         // 1) update test set value = value + 10; -- T1
137         Scan updateScan = new Scan();
138         ResultScanner tx1Scanner = txTable.getScanner(tx2, updateScan);
139         Result updateRes = tx1Scanner.next();
140         int count = 0;
141         while (updateRes != null) {
142             LOG.info("RESSS {}", updateRes);
143             Put row = new Put(updateRes.getRow());
144             int val = Bytes.toInt(updateRes.getValue(famName, colName));
145             LOG.info("Updating row id {} with value {}", Bytes.toString(updateRes.getRow()), val);
146             row.add(famName, colName, Bytes.toBytes(val + 10));
147             txTable.put(tx1, row);
148             updateRes = tx1Scanner.next();
149             count++;
150         }
151         assertEquals(count, 2);
152 
153         // 2) delete from test where value = 20; -- T2, BLOCKS
154         Scan scan = new Scan();
155         Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
156         scan.setFilter(f);
157         ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
158         // assertEquals(tx2Scanner.next(100).length, 1);
159         Result res = tx2Scanner.next();
160         int count20 = 0;
161         while (res != null) {
162             LOG.info("RESSS {}", res);
163             LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()),
164                      Bytes.toInt(res.getValue(famName, colName)));
165             Delete delete20 = new Delete(res.getRow());
166             txTable.delete(tx2, delete20);
167             res = tx2Scanner.next();
168             count20++;
169         }
170         assertEquals(count20, 1);
171         // 3) commit TX 1
172         tm.commit(tx1);
173 
174         tx2Scanner = txTable.getScanner(tx2, scan);
175         assertNull(tx2Scanner.next());
176 
177         // 4) commit TX 2 -> Should be rolled-back
178         try {
179             tm.commit(tx2);
180             fail();
181         } catch (RollbackException e) {
182             // Expected
183         }
184 
185     }
186 
187     @Test
188     public void testSIPreventsLostUpdates(ITestContext context) throws Exception {
189         // TX History for P4:
190         // begin; set transaction isolation level repeatable read; -- T1
191         // begin; set transaction isolation level repeatable read; -- T2
192         // select * from test where id = 1; -- T1
193         // select * from test where id = 1; -- T2
194         // update test set value = 11 where id = 1; -- T1
195         // update test set value = 11 where id = 1; -- T2, BLOCKS
196         // commit; -- T1. T2 now prints out "ERROR: could not serialize access due to concurrent update"
197         // abort;  -- T2. There's nothing else we can do, this transaction has failed
198 
199         // 0) Start transactions
200         TransactionManager tm = newTransactionManager(context);
201         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
202         Transaction tx1 = tm.begin();
203         Transaction tx2 = tm.begin();
204 
205         Scan scan = new Scan(rowId1, rowId1);
206         scan.addColumn(famName, colName);
207 
208         // 1) select * from test where id = 1; -- T1
209         ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
210         Result res = tx1Scanner.next();
211         int count = 0;
212         while (res != null) {
213             LOG.info("RESSS {}", res);
214             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
215                      Bytes.toString(res.getValue(famName, colName)));
216             assertEquals(res.getRow(), rowId1);
217             assertEquals(res.getValue(famName, colName), dataValue1);
218             res = tx1Scanner.next();
219             count++;
220         }
221         assertEquals(count, 1);
222 
223         // 2) select * from test where id = 1; -- T2
224         ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
225         res = tx2Scanner.next();
226         count = 0;
227         while (res != null) {
228             LOG.info("RESSS {}", res);
229             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
230                      Bytes.toString(res.getValue(famName, colName)));
231             assertEquals(res.getRow(), rowId1);
232             assertEquals(res.getValue(famName, colName), dataValue1);
233             res = tx2Scanner.next();
234             count++;
235         }
236         assertEquals(count, 1);
237 
238         // 3) update test set value = 11 where id = 1; -- T1
239         Put updateRow1Tx1 = new Put(rowId1);
240         updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
241         txTable.put(tx1, updateRow1Tx1);
242 
243         // 4) update test set value = 11 where id = 1; -- T2
244         Put updateRow1Tx2 = new Put(rowId1);
245         updateRow1Tx2.add(famName, colName, Bytes.toBytes("11"));
246         txTable.put(tx2, updateRow1Tx2);
247 
248         // 5) commit -- T1
249         tm.commit(tx1);
250 
251         // 6) commit -- T2 --> should be rolled-back
252         try {
253             tm.commit(tx2);
254             fail();
255         } catch (RollbackException e) {
256             // Expected
257         }
258 
259     }
260 
261     @Test
262     public void testSIPreventsReadSkew(ITestContext context) throws Exception {
263         // TX History for G-single:
264         // begin; set transaction isolation level repeatable read; -- T1
265         // begin; set transaction isolation level repeatable read; -- T2
266         // select * from test where id = 1; -- T1. Shows 1 => 10
267         // select * from test where id = 1; -- T2
268         // select * from test where id = 2; -- T2
269         // update test set value = 12 where id = 1; -- T2
270         // update test set value = 18 where id = 2; -- T2
271         // commit; -- T2
272         // select * from test where id = 2; -- T1. Shows 2 => 20
273         // commit; -- T1
274 
275         // 0) Start transactions
276         TransactionManager tm = newTransactionManager(context);
277         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
278         Transaction tx1 = tm.begin();
279         Transaction tx2 = tm.begin();
280 
281         Scan rowId1Scan = new Scan(rowId1, rowId1);
282         rowId1Scan.addColumn(famName, colName);
283 
284         // 1) select * from test where id = 1; -- T1. Shows 1 => 10
285         ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId1Scan);
286         Result res = tx1Scanner.next();
287         int count = 0;
288         while (res != null) {
289             LOG.info("RESSS {}", res);
290             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
291                      Bytes.toString(res.getValue(famName, colName)));
292             assertEquals(res.getRow(), rowId1);
293             assertEquals(res.getValue(famName, colName), dataValue1);
294             res = tx1Scanner.next();
295             count++;
296         }
297         assertEquals(count, 1);
298 
299         // 2) select * from test where id = 1; -- T2
300         ResultScanner tx2Scanner = txTable.getScanner(tx2, rowId1Scan);
301         res = tx2Scanner.next();
302         count = 0;
303         while (res != null) {
304             LOG.info("RESSS {}", res);
305             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
306                      Bytes.toString(res.getValue(famName, colName)));
307             assertEquals(res.getRow(), rowId1);
308             assertEquals(res.getValue(famName, colName), dataValue1);
309             res = tx2Scanner.next();
310             count++;
311         }
312 
313         Scan rowId2Scan = new Scan(rowId2, rowId2);
314         rowId2Scan.addColumn(famName, colName);
315 
316         // 3) select * from test where id = 2; -- T2
317         tx2Scanner = txTable.getScanner(tx2, rowId2Scan);
318         res = tx2Scanner.next();
319         count = 0;
320         while (res != null) {
321             LOG.info("RESSS {}", res);
322             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
323                      Bytes.toString(res.getValue(famName, colName)));
324             assertEquals(res.getRow(), rowId2);
325             assertEquals(res.getValue(famName, colName), dataValue2);
326             res = tx2Scanner.next();
327             count++;
328         }
329 
330         // 4) update test set value = 12 where id = 1; -- T2
331         Put updateRow1Tx2 = new Put(rowId1);
332         updateRow1Tx2.add(famName, colName, Bytes.toBytes("12"));
333         txTable.put(tx1, updateRow1Tx2);
334 
335         // 5) update test set value = 18 where id = 1; -- T2
336         Put updateRow2Tx2 = new Put(rowId2);
337         updateRow2Tx2.add(famName, colName, Bytes.toBytes("18"));
338         txTable.put(tx2, updateRow2Tx2);
339 
340         // 6) commit -- T2
341         tm.commit(tx2);
342 
343         // 7) select * from test where id = 2; -- T1. Shows 2 => 20
344         tx1Scanner = txTable.getScanner(tx1, rowId2Scan);
345         res = tx1Scanner.next();
346         count = 0;
347         while (res != null) {
348             LOG.info("RESSS {}", res);
349             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
350                      Bytes.toString(res.getValue(famName, colName)));
351             assertEquals(res.getRow(), rowId2);
352             assertEquals(res.getValue(famName, colName), dataValue2);
353             res = tx1Scanner.next();
354             count++;
355         }
356 
357         // 8) commit -- T1
358         tm.commit(tx1);
359 
360     }
361 
362     @Test
363     public void testSIPreventsReadSkewUsingWritePredicate(ITestContext context) throws Exception {
364         // TX History for G-single:
365         // begin; set transaction isolation level repeatable read; -- T1
366         // begin; set transaction isolation level repeatable read; -- T2
367         // select * from test where id = 1; -- T1. Shows 1 => 10
368         // select * from test; -- T2
369         // update test set value = 12 where id = 1; -- T2
370         // update test set value = 18 where id = 2; -- T2
371         // commit; -- T2
372         // delete from test where value = 20; -- T1. Prints "ERROR: could not serialize access due to concurrent update"
373         // abort; -- T1. There's nothing else we can do, this transaction has failed
374 
375         // 0) Start transactions
376         TransactionManager tm = newTransactionManager(context);
377         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
378         Transaction tx1 = tm.begin();
379         Transaction tx2 = tm.begin();
380 
381         // 1) select * from test; -- T1
382         assertNumberOfRows(txTable, tx1, 2, new Scan());
383 
384         // 2) select * from test; -- T2
385         assertNumberOfRows(txTable, tx2, 2, new Scan());
386 
387         // 3) update test set value = 12 where id = 1; -- T2
388         // 4) update test set value = 18 where id = 2; -- T2
389         Put updateRow1Tx2 = new Put(rowId1);
390         updateRow1Tx2.add(famName, colName, Bytes.toBytes(12));
391         Put updateRow2Tx2 = new Put(rowId2);
392         updateRow2Tx2.add(famName, colName, Bytes.toBytes(18));
393         txTable.put(tx2, Arrays.asList(updateRow1Tx2, updateRow2Tx2));
394 
395         // 5) commit; -- T2
396         tm.commit(tx2);
397 
398         // 6) delete from test where value = 20; -- T1. Prints
399         // "ERROR: could not serialize access due to concurrent update"
400         Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
401         Scan checkFor20 = new Scan();
402         checkFor20.setFilter(f);
403         ResultScanner checkFor20Scanner = txTable.getScanner(tx1, checkFor20);
404         Result res = checkFor20Scanner.next();
405         while (res != null) {
406             LOG.info("RESSS {}", res);
407             LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
408             Delete delete20 = new Delete(res.getRow());
409             txTable.delete(tx1, delete20);
410             res = checkFor20Scanner.next();
411         }
412 
413         // 7) abort; -- T1
414         try {
415             tm.commit(tx1);
416             fail("Should be aborted");
417         } catch (RollbackException e) {
418             // Expected
419         }
420 
421     }
422 
423     // this test shows that Omid does not provide serilizable level of isolation other wise last commit would have failed
424     @Test
425     public void testSIDoesNotPreventWriteSkew(ITestContext context) throws Exception {
426         // TX History for G2-item:
427         // begin; set transaction isolation level repeatable read; -- T1
428         // begin; set transaction isolation level repeatable read; -- T2
429         // select * from test where id in (1,2); -- T1
430         // select * from test where id in (1,2); -- T2
431         // update test set value = 11 where id = 1; -- T1
432         // update test set value = 21 where id = 2; -- T2
433         // commit; -- T1
434         // commit; -- T2
435 
436         // 0) Start transactions
437         TransactionManager tm = newTransactionManager(context);
438         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
439         Transaction tx1 = tm.begin();
440         Transaction tx2 = tm.begin();
441 
442         Scan rowId12Scan = new Scan(rowId1, rowId3);
443         rowId12Scan.addColumn(famName, colName);
444 
445         // 1) select * from test where id in (1,2); -- T1
446         ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId12Scan);
447         Result res = tx1Scanner.next();
448         int count = 0;
449         while (res != null) {
450             LOG.info("RESSS {}", res);
451             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
452             switch (count) {
453                 case 0:
454                     assertEquals(res.getRow(), rowId1);
455                     assertEquals(res.getValue(famName, colName), dataValue1);
456                     break;
457                 case 1:
458                     assertEquals(res.getRow(), rowId2);
459                     assertEquals(res.getValue(famName, colName), dataValue2);
460                     break;
461                 default:
462                     fail();
463             }
464             res = tx1Scanner.next();
465             count++;
466         }
467         assertEquals(count, 2);
468 
469         // 2) select * from test where id in (1,2); -- T2
470         ResultScanner tx2Scanner = txTable.getScanner(tx1, rowId12Scan);
471         res = tx2Scanner.next();
472         count = 0;
473         while (res != null) {
474             LOG.info("RESSS {}", res);
475             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
476             switch (count) {
477                 case 0:
478                     assertEquals(res.getRow(), rowId1);
479                     assertEquals(res.getValue(famName, colName), dataValue1);
480                     break;
481                 case 1:
482                     assertEquals(res.getRow(), rowId2);
483                     assertEquals(res.getValue(famName, colName), dataValue2);
484                     break;
485                 default:
486                     fail();
487             }
488             res = tx2Scanner.next();
489             count++;
490         }
491         assertEquals(count, 2);
492 
493         // 3) update test set value = 11 where id = 1; -- T1
494         Put updateRow1Tx1 = new Put(rowId1);
495         updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
496         txTable.put(tx1, updateRow1Tx1);
497 
498         // 4) update test set value = 21 where id = 2; -- T2
499         Put updateRow2Tx2 = new Put(rowId2);
500         updateRow2Tx2.add(famName, colName, Bytes.toBytes("21"));
501         txTable.put(tx2, updateRow2Tx2);
502 
503         // 5) commit; -- T1
504         tm.commit(tx1);
505 
506         // 6) commit; -- T2
507         tm.commit(tx2);
508     }
509 
510     // this test shows that Omid does not provide serilizable level of isolation other wise last commit would have failed
511     @Test
512     public void testSIDoesNotPreventAntiDependencyCycles(ITestContext context) throws Exception {
513         // TX History for G2:
514         // begin; set transaction isolation level repeatable read; -- T1
515         // begin; set transaction isolation level repeatable read; -- T2
516         // select * from test where value % 3 = 0; -- T1
517         // select * from test where value % 3 = 0; -- T2
518         // insert into test (id, value) values(3, 30); -- T1
519         // insert into test (id, value) values(4, 42); -- T2
520         // commit; -- T1
521         // commit; -- T2
522         // select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 4 => 42
523 
524         // 0) Start transactions
525         TransactionManager tm = newTransactionManager(context);
526         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
527         Transaction tx1 = tm.begin();
528         Transaction tx2 = tm.begin();
529 
530         Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes("30"));
531         Scan value30 = new Scan();
532         value30.setFilter(f);
533         value30.addColumn(famName, colName);
534 
535         // 1) select * from test where value % 3 = 0; -- T1
536         assertNumberOfRows(txTable, tx1, 0, value30);
537 
538 
539         // 2) select * from test where value % 3 = 0; -- T2
540         assertNumberOfRows(txTable, tx2, 0, value30);
541 
542 
543         // 3) insert into test (id, value) values(3, 30); -- T1
544         Put insertRow3Tx1 = new Put(rowId1);
545         insertRow3Tx1.add(famName, colName, Bytes.toBytes("30"));
546         txTable.put(tx1, insertRow3Tx1);
547 
548         // 4) insert into test (id, value) values(4, 42); -- T2
549         Put updateRow4Tx2 = new Put(rowId2);
550         updateRow4Tx2.add(famName, colName, Bytes.toBytes("42"));
551         txTable.put(tx2, updateRow4Tx2);
552 
553         // 5) commit; -- T1
554         tm.commit(tx1);
555 
556         // 6) commit; -- T2
557         tm.commit(tx2);
558 
559         // 7) select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 4 => 42
560     }
561 
562     /**
563      * This translates the table initialization done in:
564      * https://github.com/ept/hermitage/blob/master/postgres.md
565      *
566      * create table test (id int primary key, value int);
567      * insert into test (id, value) values (1, 10), (2, 20);
568      */
569     @BeforeMethod(alwaysRun = true)
570     private void loadBaseDataOnTestTable(ITestContext context) throws Exception {
571 
572         TransactionManager tm = newTransactionManager(context);
573         TTable txTable = new TTable(hbaseConf, TEST_TABLE);
574 
575         Transaction initializationTx = tm.begin();
576         Put row1 = new Put(rowId1);
577         row1.add(famName, colName, dataValue1);
578         txTable.put(initializationTx, row1);
579         Put row2 = new Put(rowId2);
580         row2.add(famName, colName, dataValue2);
581         txTable.put(initializationTx, row2);
582 
583         tm.commit(initializationTx);
584     }
585 
586 
587     private void assertNumberOfRows(TTable txTable, Transaction tx2, int maxCount, Scan scan) throws IOException {
588         int count = 0;
589         ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
590         Result res = tx2Scanner.next();
591         while (res != null) {
592             LOG.info("RESSS {}", res);
593             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
594             res = tx2Scanner.next();
595             count++;
596         }
597         assertEquals(count, maxCount);
598     }
599 
600 
601 }