View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.NavigableMap;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Connection;
39  import org.apache.hadoop.hbase.client.Delete;
40  import org.apache.hadoop.hbase.client.Get;
41  import org.apache.hadoop.hbase.client.Mutation;
42  import org.apache.hadoop.hbase.client.OperationWithAttributes;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.client.ResultScanner;
46  import org.apache.hadoop.hbase.client.Row;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.io.TimeRange;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.omid.committable.CommitTable;
52  import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  
56  
57  /**
58   * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link
59   * Transaction} object. It mimics the behavior in {@link org.apache.hadoop.hbase.client.Table}
60   */
61  public class TTable implements Closeable {
62  
63      private static Logger LOG = LoggerFactory.getLogger(TTable.class);
64  
65      private Table table;
66  
67      private SnapshotFilter snapshotFilter;
68  
69      private boolean serverSideFilter;
70      
71      private final List<Mutation> mutations;
72      
73      private boolean autoFlush = true;
74      
75      private final boolean conflictFree;
76      
77      // ----------------------------------------------------------------------------------------------------------------
78      // Construction
79      // ----------------------------------------------------------------------------------------------------------------
80  
81      public TTable(Connection connection, byte[] tableName) throws IOException {
82          this(connection.getTable(TableName.valueOf(tableName)), false);
83      }
84  
85      public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
86          this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
87      }
88  
89      public TTable(Connection connection, String tableName) throws IOException {
90          this(connection.getTable(TableName.valueOf(tableName)), false);
91      }
92  
93      public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
94          this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
95      }
96  
97      public TTable(Table hTable) throws IOException {
98          this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), false);
99      }
100 
101     public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
102         this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
103     }
104 
105     public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
106         this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
107     }
108 
109     public TTable(Connection connection, String tableName, boolean conflictFree) throws IOException {
110         this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
111     }
112 
113     public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
114         this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
115     }
116 
117     public TTable(Table hTable, boolean conflictFree) throws IOException {
118         this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), conflictFree);
119     }
120 
121     public TTable(Table hTable, SnapshotFilter snapshotFilter) throws IOException {
122         this(hTable, snapshotFilter, false);
123     }
124 
125     public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
126         this(hTable, commitTableClient, false);
127     }
128 
129     public TTable(Table hTable, boolean serverSideFilter, boolean conflictFree) throws IOException {
130         this.table = hTable;
131         this.conflictFree = conflictFree;
132         this.mutations = new ArrayList<Mutation>();
133         this.serverSideFilter = serverSideFilter;
134         this.snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
135                 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable));
136     }
137 
138     public TTable(Table hTable, SnapshotFilter snapshotFilter, boolean conflictFree) throws IOException {
139         this.table = hTable;
140         this.conflictFree = conflictFree;
141         this.mutations = new ArrayList<Mutation>();
142         this.snapshotFilter = snapshotFilter;
143     }
144 
145     public TTable(Table hTable, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
146         this.table = hTable;
147         this.conflictFree = conflictFree;
148         this.mutations = new ArrayList<Mutation>();
149         this.serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
150         this.snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
151                 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable), commitTableClient);
152     }
153 
154     // ----------------------------------------------------------------------------------------------------------------
155     // Closeable implementation
156     // ----------------------------------------------------------------------------------------------------------------
157 
158     /**
159      * Releases any resources held or pending changes in internal buffers.
160      *
161      * @throws IOException if a remote or network exception occurs.
162      */
163     @Override
164     public void close() throws IOException {
165         table.close();
166     }
167 
168     // ----------------------------------------------------------------------------------------------------------------
169     // Transactional operations
170     // ----------------------------------------------------------------------------------------------------------------
171 
172     /**
173      * Transactional version of {@link Table#get(Get get)}
174      *
175      * @param get an instance of Get
176      * @param tx  an instance of transaction to be used
177      * @return Result an instance of Result
178      * @throws IOException if a remote or network exception occurs.
179      */
180     public Result get(Transaction tx, final Get get) throws IOException {
181 
182         throwExceptionIfOpSetsTimerange(get);
183 
184         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
185 
186         final long readTimestamp = transaction.getReadTimestamp();
187         final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
188         propagateAttributes(get, tsget);
189         TimeRange timeRange = get.getTimeRange();
190         long startTime = timeRange.getMin();
191         long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
192         tsget.setTimeRange(startTime, endTime).setMaxVersions(1);
193         Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
194         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
195             byte[] family = entry.getKey();
196             NavigableSet<byte[]> qualifiers = entry.getValue();
197             if (qualifiers == null || qualifiers.isEmpty()) {
198                 tsget.addFamily(family);
199             } else {
200                 for (byte[] qualifier : qualifiers) {
201                     tsget.addColumn(family, qualifier);
202                     tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
203                 }
204                 tsget.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER);
205                 tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
206             }
207         }
208         LOG.trace("Initial Get = {}", tsget);
209 
210         return snapshotFilter.get(tsget, transaction);
211     }
212 
213     static private void propagateAttributes(OperationWithAttributes from, OperationWithAttributes to) {
214         Map<String,byte[]> attributeMap = from.getAttributesMap();
215 
216         for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
217             to.setAttribute(entry.getKey(), entry.getValue());
218         }
219     }
220 
221     private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException {
222         Result result = this.get(tx, deleteG);
223         if (!result.isEmpty()) {
224             for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
225                     .entrySet()) {
226                 byte[] family = entryF.getKey();
227                 for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
228                     byte[] qualifier = entryQ.getKey();
229                     addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, qualifier,
230                             tx.getWriteTimestamp()));
231                 }
232                 deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
233                         HConstants.EMPTY_BYTE_ARRAY);
234                 addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
235                                                 tx.getWriteTimestamp()));
236             }
237         }
238     }
239 
240     private void  familyQualifierBasedDeletionWithOutRead(HBaseTransaction tx, Put deleteP, Get deleteG) {
241         Set<byte[]> fset = deleteG.getFamilyMap().keySet();
242 
243         for (byte[] family : fset) {
244             deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
245                     HConstants.EMPTY_BYTE_ARRAY);
246             addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
247                     tx.getWriteTimestamp()));
248 
249         }
250     }
251 
252     /**
253      * Transactional version of {@link Table#delete(Delete delete)}
254      *
255      * @param delete an instance of Delete
256      * @param tx     an instance of transaction to be used
257      * @throws IOException if a remote or network exception occurs.
258      */
259     public void delete(Transaction tx, Delete delete) throws IOException {
260         Put deleteP = deleteInternal(tx, delete);
261         if (!deleteP.isEmpty()) {
262             addMutation(deleteP);
263         }
264     }
265     
266     private Put deleteInternal(Transaction tx, Delete delete) throws IOException {
267 
268         throwExceptionIfOpSetsTimerange(delete);
269 
270         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
271 
272         final long writeTimestamp = transaction.getWriteTimestamp();
273         boolean deleteFamily = false;
274 
275         final Put deleteP = new Put(delete.getRow(), writeTimestamp);
276         final Get deleteG = new Get(delete.getRow());
277         propagateAttributes(delete, deleteP);
278         propagateAttributes(delete, deleteG);
279         Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
280         if (fmap.isEmpty()) {
281             familyQualifierBasedDeletion(transaction, deleteP, deleteG);
282         }
283 
284         for (List<Cell> cells : fmap.values()) {
285             for (Cell cell : cells) {
286                 CellUtils.validateCell(cell, writeTimestamp);
287                 switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
288                     case DeleteColumn:
289                         deleteP.addColumn(CellUtil.cloneFamily(cell),
290                                     CellUtil.cloneQualifier(cell),
291                                     writeTimestamp,
292                                     CellUtils.DELETE_TOMBSTONE);
293                         addWriteSetElement(transaction,
294                             new HBaseCellId(this,
295                                             delete.getRow(),
296                                             CellUtil.cloneFamily(cell),
297                                             CellUtil.cloneQualifier(cell),
298                                             writeTimestamp));
299                         break;
300                     case DeleteFamily:
301                         deleteG.addFamily(CellUtil.cloneFamily(cell));
302                         deleteFamily = true;
303                         break;
304                     case Delete:
305                         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
306                             deleteP.addColumn(CellUtil.cloneFamily(cell),
307                                         CellUtil.cloneQualifier(cell),
308                                         writeTimestamp,
309                                         CellUtils.DELETE_TOMBSTONE);
310                             addWriteSetElement(transaction,
311                                 new HBaseCellId(this,
312                                                 delete.getRow(),
313                                                 CellUtil.cloneFamily(cell),
314                                                 CellUtil.cloneQualifier(cell),
315                                                 writeTimestamp));
316                             break;
317                         } else {
318                             throw new UnsupportedOperationException(
319                                 "Cannot delete specific versions on Snapshot Isolation.");
320                         }
321                     default:
322                         break;
323                 }
324             }
325         }
326         if (deleteFamily) {
327             if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).
328                     getConflictDetectionLevel() == ConflictDetectionLevel.ROW) {
329                 familyQualifierBasedDeletionWithOutRead(transaction, deleteP, deleteG);
330             } else {
331                 familyQualifierBasedDeletion(transaction, deleteP, deleteG);
332             }
333         }
334 
335         return deleteP;
336     }
337 
338     /**
339      * Transactional version of {@link Table#put(Put put)}
340      *
341      * @param put an instance of Put
342      * @param tx  an instance of transaction to be used
343      * @throws IOException if a remote or network exception occurs.
344      */
345     public void put(Transaction tx, Put put) throws IOException {
346         put(tx, put, false);
347     }
348 
349 
350     /**
351      * @param put an instance of Put
352      * @param timestamp  timestamp to be used as cells version
353      * @param commitTimestamp  timestamp to be used as commit timestamp
354      * @throws IOException if a remote or network exception occurs.
355      */
356     static public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
357         final Put tsput = new Put(put.getRow(), timestamp);
358         propagateAttributes(put, tsput);
359 
360         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
361         for (List<Cell> kvl : kvs.values()) {
362             for (Cell c : kvl) {
363                 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
364                 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), timestamp);
365                 try {
366                     tsput.add(kv);
367                 } catch (IOException e) {
368                     // The existing Put has this Cell, so the cloned one
369                     // will never throw an IOException when it's added.
370                     throw new RuntimeException(e);
371                 }
372                 tsput.addColumn(CellUtil.cloneFamily(kv),
373                         CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
374                         kv.getTimestamp(),
375                         Bytes.toBytes(commitTimestamp));
376             }
377         }
378 
379         return tsput;
380     }
381 
382 
383     /**
384      * @param put an instance of Put
385      * @param tx  an instance of transaction to be used
386      * @param addShadowCell  denotes whether to add the shadow cell
387      * @throws IOException if a remote or network exception occurs.
388      */
389     public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
390         Put tsput = putInternal(tx, put, addShadowCell);
391         addMutation(tsput);
392     }
393     
394     private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
395 
396         throwExceptionIfOpSetsTimerange(put);
397 
398         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
399 
400         final long writeTimestamp = transaction.getWriteTimestamp();
401 
402         // create put with correct ts
403         final Put tsput = new Put(put.getRow(), writeTimestamp);
404         propagateAttributes(put, tsput);
405         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
406         for (List<Cell> kvl : kvs.values()) {
407             for (Cell c : kvl) {
408                 CellUtils.validateCell(c, writeTimestamp);
409                 // Reach into keyvalue to update timestamp.
410                 // It's not nice to reach into keyvalue internals,
411                 // but we want to avoid having to copy the whole thing
412                 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
413                 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
414                 tsput.add(kv);
415 
416                 if (addShadowCell) {
417                     tsput.addColumn(CellUtil.cloneFamily(kv),
418                             CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
419                             kv.getTimestamp(),
420                             Bytes.toBytes(kv.getTimestamp()));
421                 } else {
422                     HBaseCellId cellId = new HBaseCellId(this,
423                             CellUtil.cloneRow(kv),
424                             CellUtil.cloneFamily(kv),
425                             CellUtil.cloneQualifier(kv),
426                             kv.getTimestamp());
427 
428                     addWriteSetElement(transaction, cellId);
429                 }
430             }
431         }
432         return tsput;
433     }
434     
435     private void addWriteSetElement(HBaseTransaction transaction, HBaseCellId cellId) {
436         if (conflictFree) {
437             transaction.addConflictFreeWriteSetElement(cellId);
438         } else {
439             transaction.addWriteSetElement(cellId);
440         }
441         
442     }
443 
444     private void addMutation(Mutation m) throws IOException {
445         this.mutations.add(m);
446         if (autoFlush) {
447             flushCommits();
448         }
449     }
450     
451     private void addMutations(List<Mutation> mutations) throws IOException {
452         this.mutations.addAll(mutations);
453         if (autoFlush) {
454             flushCommits();
455         }
456     }
457     
458     /**
459      * Transactional version of {@link Table#getScanner(Scan scan)}
460      *
461      * @param scan an instance of Scan
462      * @param tx   an instance of transaction to be used
463      * @return ResultScanner an instance of ResultScanner
464      * @throws IOException if a remote or network exception occurs.
465      */
466     public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException {
467 
468         throwExceptionIfOpSetsTimerange(scan);
469 
470         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
471 
472         Scan tsscan = new Scan(scan);
473         tsscan.setMaxVersions(1);
474         tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1);
475         propagateAttributes(scan, tsscan);
476         Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
477         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
478             byte[] family = entry.getKey();
479             NavigableSet<byte[]> qualifiers = entry.getValue();
480             if (qualifiers == null) {
481                 continue;
482             }
483             for (byte[] qualifier : qualifiers) {
484                 tsscan.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
485             }
486             if (!qualifiers.isEmpty()) {
487                 tsscan.addColumn(entry.getKey(), CellUtils.FAMILY_DELETE_QUALIFIER);
488             }
489         }
490 
491         return snapshotFilter.getScanner(tsscan, transaction);
492     }
493 
494     /**
495      *
496      * @return array of byte
497      */
498     public byte[] getTableName() {
499         return table.getName().getName();
500     }
501 
502     /**
503      * Delegates to {@link Table#getConfiguration()}
504      *
505      * @return standard configuration object
506      */
507     public Configuration getConfiguration() {
508         return table.getConfiguration();
509     }
510 
511     /**
512      * Delegates to {@link Table#getTableDescriptor()}
513      *
514      * @return HTableDescriptor an instance of HTableDescriptor
515      * @throws IOException if a remote or network exception occurs.
516      */
517     public HTableDescriptor getTableDescriptor() throws IOException {
518         return table.getTableDescriptor();
519     }
520 
521     /**
522      * Transactional version of {@link Table#exists(Get get)}
523      *
524      * @param transaction an instance of transaction to be used
525      * @param get         an instance of Get
526      * @return true if cell exists
527      * @throws IOException if a remote or network exception occurs.
528      */
529     public boolean exists(Transaction transaction, Get get) throws IOException {
530         Result result = get(transaction, get);
531         return !result.isEmpty();
532     }
533 
534     /* TODO What should we do with this methods???
535      * @Override public void batch(Transaction transaction, List<? extends Row>
536      * actions, Object[] results) throws IOException, InterruptedException {}
537      *
538      * @Override public Object[] batch(Transaction transaction, List<? extends
539      * Row> actions) throws IOException, InterruptedException {}
540      *
541      * @Override public <R> void batchCallback(Transaction transaction, List<?
542      * extends Row> actions, Object[] results, Callback<R> callback) throws
543      * IOException, InterruptedException {}
544      *
545      * @Override public <R> Object[] batchCallback(List<? extends Row> actions,
546      * Callback<R> callback) throws IOException, InterruptedException {}
547      */
548 
549     /**
550      * Transactional version of {@link Table#get(List gets)}
551      *
552      * @param transaction an instance of transaction to be used
553      * @param gets        list of Get instances
554      * @return array of Results
555      * @throws IOException if a remote or network exception occurs
556      */
557     public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
558         Result[] results = new Result[gets.size()];
559         int i = 0;
560         for (Get get : gets) {
561             results[i++] = get(transaction, get);
562         }
563         return results;
564     }
565 
566     /**
567      * Transactional version of {@link Table#getScanner(byte[] family)}
568      *
569      * @param transaction an instance of transaction to be used
570      * @param family      column family
571      * @return an instance of ResultScanner
572      * @throws IOException if a remote or network exception occurs
573      */
574     public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
575         Scan scan = new Scan();
576         scan.addFamily(family);
577         return getScanner(transaction, scan);
578     }
579 
580     /**
581      * Transactional version of {@link Table#getScanner(byte[] family, byte[] qualifier)}
582      *
583      * @param transaction an instance of transaction to be used
584      * @param family      column family
585      * @param qualifier   column name
586      * @return an instance of ResultScanner
587      * @throws IOException if a remote or network exception occurs
588      */
589     public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier)
590         throws IOException {
591         Scan scan = new Scan();
592         scan.addColumn(family, qualifier);
593         return getScanner(transaction, scan);
594     }
595 
596     /**
597      * Transactional version of {@link Table#put(List puts)}
598      *
599      * @param transaction an instance of transaction to be used
600      * @param puts        List of puts
601      * @param addShadowCell  denotes whether to add the shadow cell
602      * @throws IOException if a remote or network exception occurs
603      */
604     public void put(Transaction transaction, List<Put> puts, boolean addShadowCells) throws IOException {
605         List<Mutation> mutations = new ArrayList<>(puts.size());
606         for (Put put : puts) {
607             mutations.add(putInternal(transaction, put, addShadowCells));
608         }
609         addMutations(mutations);
610     }
611 
612     public void put(Transaction transaction, List<Put> puts) throws IOException {
613         put(transaction, puts, false);
614     }
615 
616     /**
617      * Transactional version of {@link Table#batch(List<? extends Row> rows)}
618      *
619      * @param transaction an instance of transaction to be used
620      * @param rows        List of rows that must be instances of Put or Delete
621      * @param addShadowCell  denotes whether to add the shadow cell
622      * @throws IOException if a remote or network exception occurs
623      */
624     public void batch(Transaction transaction, List<? extends Row> rows, boolean addShadowCells) throws IOException {
625         List<Mutation> mutations = new ArrayList<>(rows.size());
626         for (Row row : rows) {
627             if (row instanceof Put) {
628                 mutations.add(putInternal(transaction, (Put)row, addShadowCells));
629             } else if (row instanceof Delete) {
630                 Put deleteP = deleteInternal(transaction, (Delete)row);
631                 if (!deleteP.isEmpty()) {
632                     mutations.add(deleteP);
633                 }
634             } else {
635                 throw new UnsupportedOperationException("Unsupported mutation: " + row);
636             }
637         }
638         addMutations(mutations);
639     }
640 
641     public void batch(Transaction transaction, List<? extends Row> rows) throws IOException {
642         batch(transaction, rows, false);
643     }
644 
645     /**
646      * Transactional version of {@link Table#delete(List deletes)}
647      *
648      * @param transaction an instance of transaction to be used
649      * @param deletes        List of deletes
650      * @throws IOException if a remote or network exception occurs
651      */
652     public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
653         List<Mutation> mutations = new ArrayList<>(deletes.size());
654         for (Delete delete : deletes) {
655             Put deleteP = deleteInternal(transaction, delete);
656             if (!deleteP.isEmpty()) {
657                 mutations.add(deleteP);
658             }
659         }
660         addMutations(mutations);
661     }
662 
663     /**
664      * Provides access to the underliying Table in order to configure it or to perform unsafe (non-transactional)
665      * operations. The latter would break the transactional guarantees of the whole system.
666      *
667      * @return The underlying Table object
668      */
669     public Table getHTable() {
670         return table;
671     }
672 
673     public void setAutoFlush(boolean autoFlush) {
674         this.autoFlush = autoFlush;
675     }
676 
677     public boolean isAutoFlush() {
678         return autoFlush;
679     }
680 
681     public void flushCommits() throws IOException {
682         try {
683             table.batch(this.mutations, new Object[mutations.size()]);
684         } catch (InterruptedException e) {
685             Thread.interrupted();
686             throw new RuntimeException(e);
687         } finally {
688             this.mutations.clear();
689         }
690     }
691 
692     // ----------------------------------------------------------------------------------------------------------------
693     // Helper methods
694     // ----------------------------------------------------------------------------------------------------------------
695 
696     private void throwExceptionIfOpSetsTimerange(Get getOperation) {
697         TimeRange tr = getOperation.getTimeRange();
698         checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
699     }
700 
701     private void throwExceptionIfOpSetsTimerange(Scan scanOperation) {
702         TimeRange tr = scanOperation.getTimeRange();
703         checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
704     }
705 
706     private void checkTimerangeIsSetToDefaultValuesOrThrowException(TimeRange tr) {
707         if (tr.getMin() != 0L || tr.getMax() != Long.MAX_VALUE) {
708             throw new IllegalArgumentException(
709                 "Timestamp/timerange not allowed in transactional user operations");
710         }
711     }
712 
713     private void throwExceptionIfOpSetsTimerange(Mutation userOperation) {
714         if (userOperation.getTimeStamp() != HConstants.LATEST_TIMESTAMP) {
715             throw new IllegalArgumentException(
716                 "Timestamp not allowed in transactional user operations");
717         }
718     }
719 
720     private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
721         if (tx instanceof HBaseTransaction) {
722             return (HBaseTransaction) tx;
723         } else {
724             throw new IllegalArgumentException(
725                 String.format("The transaction object passed %s is not an instance of HBaseTransaction",
726                               tx.getClass().getName()));
727         }
728     }
729 
730     private HBaseTransactionManager enforceHBaseTransactionManagerAsParam(TransactionManager tm) {
731         if (tm instanceof HBaseTransactionManager) {
732             return (HBaseTransactionManager) tm;
733         } else {
734             throw new IllegalArgumentException(
735                 String.format("The transaction manager object passed %s is not an instance of HBaseTransactionManager ",
736                               tm.getClass().getName()));
737         }
738     }
739 }