View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.NavigableMap;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Connection;
39  import org.apache.hadoop.hbase.client.Delete;
40  import org.apache.hadoop.hbase.client.Get;
41  import org.apache.hadoop.hbase.client.Mutation;
42  import org.apache.hadoop.hbase.client.OperationWithAttributes;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.client.ResultScanner;
46  import org.apache.hadoop.hbase.client.Row;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.io.TimeRange;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.omid.committable.CommitTable;
52  import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  
56  
57  /**
58   * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link
59   * Transaction} object. It mimics the behavior in {@link org.apache.hadoop.hbase.client.Table}
60   */
61  public class TTable implements Closeable {
62  
63      private static Logger LOG = LoggerFactory.getLogger(TTable.class);
64  
65      private Table table;
66  
67      private SnapshotFilter snapshotFilter;
68  
69      private boolean serverSideFilter;
70      
71      private final List<Mutation> mutations;
72      
73      private boolean autoFlush = true;
74      
75      private final boolean conflictFree;
76      
77      // ----------------------------------------------------------------------------------------------------------------
78      // Construction
79      // ----------------------------------------------------------------------------------------------------------------
80  
81      public TTable(Connection connection, byte[] tableName) throws IOException {
82          this(connection.getTable(TableName.valueOf(tableName)), false);
83      }
84  
85      public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
86          this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
87      }
88  
89      public TTable(Connection connection, String tableName) throws IOException {
90          this(connection.getTable(TableName.valueOf(tableName)), false);
91      }
92  
93      public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
94          this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
95      }
96  
97      public TTable(Table hTable) throws IOException {
98          this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), false);
99      }
100 
101     public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
102         this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
103     }
104 
105     public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
106         this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
107     }
108 
109     public TTable(Connection connection, String tableName, boolean conflictFree) throws IOException {
110         this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
111     }
112 
113     public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
114         this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
115     }
116 
117     public TTable(Table hTable, boolean conflictFree) throws IOException {
118         this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), conflictFree);
119     }
120 
121     public TTable(Table hTable, SnapshotFilter snapshotFilter) throws IOException {
122         this(hTable, snapshotFilter, false);
123     }
124 
125     public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
126         this(hTable, commitTableClient, false);
127     }
128 
129     public TTable(Table hTable, boolean serverSideFilter, boolean conflictFree) throws IOException {
130         this.table = hTable;
131         this.conflictFree = conflictFree;
132         this.mutations = new ArrayList<Mutation>();
133         this.serverSideFilter = serverSideFilter;
134         this.snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
135                 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable));
136     }
137 
138     public TTable(Table hTable, SnapshotFilter snapshotFilter, boolean conflictFree) throws IOException {
139         this.table = hTable;
140         this.conflictFree = conflictFree;
141         this.mutations = new ArrayList<Mutation>();
142         this.snapshotFilter = snapshotFilter;
143     }
144 
145     public TTable(Table hTable, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
146         this.table = hTable;
147         this.conflictFree = conflictFree;
148         this.mutations = new ArrayList<Mutation>();
149         this.serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
150         this.snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
151                 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable), commitTableClient);
152     }
153 
154     // ----------------------------------------------------------------------------------------------------------------
155     // Closeable implementation
156     // ----------------------------------------------------------------------------------------------------------------
157 
158     /**
159      * Releases any resources held or pending changes in internal buffers.
160      *
161      * @throws IOException if a remote or network exception occurs.
162      */
163     @Override
164     public void close() throws IOException {
165         table.close();
166         try {
167             snapshotFilter.close();
168         } catch (Exception e) {
169             LOG.warn("Failed to close TTable resources.");
170             e.printStackTrace();
171         }
172     }
173 
174     // ----------------------------------------------------------------------------------------------------------------
175     // Transactional operations
176     // ----------------------------------------------------------------------------------------------------------------
177 
178     /**
179      * Transactional version of {@link Table#get(Get get)}
180      *
181      * @param get an instance of Get
182      * @param tx  an instance of transaction to be used
183      * @return Result an instance of Result
184      * @throws IOException if a remote or network exception occurs.
185      */
186     public Result get(Transaction tx, final Get get) throws IOException {
187 
188         throwExceptionIfOpSetsTimerange(get);
189 
190         flushCommits();
191 
192         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
193 
194         final long readTimestamp = transaction.getReadTimestamp();
195         final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
196         propagateAttributes(get, tsget);
197         TimeRange timeRange = get.getTimeRange();
198         long startTime = timeRange.getMin();
199         long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
200         tsget.setTimeRange(startTime, endTime).setMaxVersions(1);
201         Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
202         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
203             byte[] family = entry.getKey();
204             NavigableSet<byte[]> qualifiers = entry.getValue();
205             if (qualifiers == null || qualifiers.isEmpty()) {
206                 tsget.addFamily(family);
207             } else {
208                 for (byte[] qualifier : qualifiers) {
209                     tsget.addColumn(family, qualifier);
210                     tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
211                 }
212                 tsget.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER);
213                 tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
214             }
215         }
216         LOG.trace("Initial Get = {}", tsget);
217 
218         return snapshotFilter.get(tsget, transaction);
219     }
220 
221     static private void propagateAttributes(OperationWithAttributes from, OperationWithAttributes to) {
222         Map<String,byte[]> attributeMap = from.getAttributesMap();
223 
224         for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
225             to.setAttribute(entry.getKey(), entry.getValue());
226         }
227     }
228 
229     private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException {
230         Result result = this.get(tx, deleteG);
231         if (!result.isEmpty()) {
232             for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
233                     .entrySet()) {
234                 byte[] family = entryF.getKey();
235                 for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
236                     byte[] qualifier = entryQ.getKey();
237                     addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, qualifier,
238                             tx.getWriteTimestamp()));
239                 }
240                 deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
241                         CellUtils.DELETE_TOMBSTONE);
242                 addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
243                                                 tx.getWriteTimestamp()));
244             }
245         }
246     }
247 
248     private void  familyQualifierBasedDeletionWithOutRead(HBaseTransaction tx, Put deleteP, Get deleteG) {
249         Set<byte[]> fset = deleteG.getFamilyMap().keySet();
250 
251         for (byte[] family : fset) {
252             deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
253                     CellUtils.DELETE_TOMBSTONE);
254             addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
255                     tx.getWriteTimestamp()));
256 
257         }
258     }
259 
260     /**
261      * Transactional version of {@link Table#delete(Delete delete)}
262      *
263      * @param delete an instance of Delete
264      * @param tx     an instance of transaction to be used
265      * @throws IOException if a remote or network exception occurs.
266      */
267     public void delete(Transaction tx, Delete delete) throws IOException {
268         Put deleteP = deleteInternal(tx, delete);
269         if (!deleteP.isEmpty()) {
270             addMutation(deleteP);
271         }
272     }
273     
274     private Put deleteInternal(Transaction tx, Delete delete) throws IOException {
275 
276         throwExceptionIfOpSetsTimerange(delete);
277 
278         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
279 
280         final long writeTimestamp = transaction.getWriteTimestamp();
281         boolean deleteFamily = false;
282 
283         final Put deleteP = new Put(delete.getRow(), writeTimestamp);
284         final Get deleteG = new Get(delete.getRow());
285         propagateAttributes(delete, deleteP);
286         propagateAttributes(delete, deleteG);
287         Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
288         if (fmap.isEmpty()) {
289             familyQualifierBasedDeletion(transaction, deleteP, deleteG);
290         }
291 
292         for (List<Cell> cells : fmap.values()) {
293             for (Cell cell : cells) {
294                 CellUtils.validateCell(cell, writeTimestamp);
295                 switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
296                     case DeleteColumn:
297                         deleteP.addColumn(CellUtil.cloneFamily(cell),
298                                     CellUtil.cloneQualifier(cell),
299                                     writeTimestamp,
300                                     CellUtils.DELETE_TOMBSTONE);
301                         addWriteSetElement(transaction,
302                             new HBaseCellId(this,
303                                             delete.getRow(),
304                                             CellUtil.cloneFamily(cell),
305                                             CellUtil.cloneQualifier(cell),
306                                             writeTimestamp));
307                         break;
308                     case DeleteFamily:
309                         deleteG.addFamily(CellUtil.cloneFamily(cell));
310                         deleteFamily = true;
311                         break;
312                     case Delete:
313                         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
314                             deleteP.addColumn(CellUtil.cloneFamily(cell),
315                                         CellUtil.cloneQualifier(cell),
316                                         writeTimestamp,
317                                         CellUtils.DELETE_TOMBSTONE);
318                             addWriteSetElement(transaction,
319                                 new HBaseCellId(this,
320                                                 delete.getRow(),
321                                                 CellUtil.cloneFamily(cell),
322                                                 CellUtil.cloneQualifier(cell),
323                                                 writeTimestamp));
324                             break;
325                         } else {
326                             throw new UnsupportedOperationException(
327                                 "Cannot delete specific versions on Snapshot Isolation.");
328                         }
329                     default:
330                         break;
331                 }
332             }
333         }
334         if (deleteFamily) {
335             if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).
336                     getConflictDetectionLevel() == ConflictDetectionLevel.ROW) {
337                 familyQualifierBasedDeletionWithOutRead(transaction, deleteP, deleteG);
338             } else {
339                 familyQualifierBasedDeletion(transaction, deleteP, deleteG);
340             }
341         }
342 
343         return deleteP;
344     }
345 
346     /**
347      * Transactional version of {@link Table#put(Put put)}
348      *
349      * @param put an instance of Put
350      * @param tx  an instance of transaction to be used
351      * @throws IOException if a remote or network exception occurs.
352      */
353     public void put(Transaction tx, Put put) throws IOException {
354         put(tx, put, false);
355     }
356 
357 
358     /**
359      * @param put an instance of Put
360      * @param timestamp  timestamp to be used as cells version
361      * @param commitTimestamp  timestamp to be used as commit timestamp
362      */
363     static public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
364         final Put tsput = new Put(put.getRow(), timestamp);
365         propagateAttributes(put, tsput);
366 
367         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
368         for (List<Cell> kvl : kvs.values()) {
369             for (Cell c : kvl) {
370                 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
371                 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), timestamp);
372                 try {
373                     tsput.add(kv);
374                 } catch (IOException e) {
375                     // The existing Put has this Cell, so the cloned one
376                     // will never throw an IOException when it's added.
377                     throw new RuntimeException(e);
378                 }
379                 tsput.addColumn(CellUtil.cloneFamily(kv),
380                         CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
381                         kv.getTimestamp(),
382                         Bytes.toBytes(commitTimestamp));
383             }
384         }
385 
386         return tsput;
387     }
388 
389 
390     /**
391      * @param put an instance of Put
392      * @param tx  an instance of transaction to be used
393      * @param addShadowCell  denotes whether to add the shadow cell
394      * @throws IOException if a remote or network exception occurs.
395      */
396     public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
397         Put tsput = putInternal(tx, put, addShadowCell);
398         addMutation(tsput);
399     }
400     
401     private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
402 
403         throwExceptionIfOpSetsTimerange(put);
404 
405         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
406 
407         final long writeTimestamp = transaction.getWriteTimestamp();
408 
409         // create put with correct ts
410         final Put tsput = new Put(put.getRow(), writeTimestamp);
411         propagateAttributes(put, tsput);
412         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
413         for (List<Cell> kvl : kvs.values()) {
414             for (Cell c : kvl) {
415                 CellUtils.validateCell(c, writeTimestamp);
416                 // Reach into keyvalue to update timestamp.
417                 // It's not nice to reach into keyvalue internals,
418                 // but we want to avoid having to copy the whole thing
419                 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
420                 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
421                 tsput.add(kv);
422 
423                 if (addShadowCell) {
424                     tsput.addColumn(CellUtil.cloneFamily(kv),
425                             CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
426                             kv.getTimestamp(),
427                             Bytes.toBytes(kv.getTimestamp()));
428                 } else {
429                     HBaseCellId cellId = new HBaseCellId(this,
430                             CellUtil.cloneRow(kv),
431                             CellUtil.cloneFamily(kv),
432                             CellUtil.cloneQualifier(kv),
433                             kv.getTimestamp());
434 
435                     addWriteSetElement(transaction, cellId);
436                 }
437             }
438         }
439         return tsput;
440     }
441     
442     private void addWriteSetElement(HBaseTransaction transaction, HBaseCellId cellId) {
443         if (conflictFree) {
444             transaction.addConflictFreeWriteSetElement(cellId);
445         } else {
446             transaction.addWriteSetElement(cellId);
447         }
448         
449     }
450 
451     private void addMutation(Mutation m) throws IOException {
452         this.mutations.add(m);
453         if (autoFlush) {
454             flushCommits();
455         }
456     }
457     
458     private void addMutations(List<Mutation> mutations) throws IOException {
459         this.mutations.addAll(mutations);
460         if (autoFlush) {
461             flushCommits();
462         }
463     }
464     
465     /**
466      * Transactional version of {@link Table#getScanner(Scan scan)}
467      *
468      * @param scan an instance of Scan
469      * @param tx   an instance of transaction to be used
470      * @return ResultScanner an instance of ResultScanner
471      * @throws IOException if a remote or network exception occurs.
472      */
473     public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException {
474 
475         throwExceptionIfOpSetsTimerange(scan);
476         flushCommits();
477         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
478 
479         Scan tsscan = new Scan(scan);
480         tsscan.setMaxVersions(1);
481         tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1);
482         propagateAttributes(scan, tsscan);
483         Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
484         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
485             byte[] family = entry.getKey();
486             NavigableSet<byte[]> qualifiers = entry.getValue();
487             if (qualifiers == null) {
488                 continue;
489             }
490             for (byte[] qualifier : qualifiers) {
491                 tsscan.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
492             }
493             if (!qualifiers.isEmpty()) {
494                 tsscan.addColumn(entry.getKey(), CellUtils.FAMILY_DELETE_QUALIFIER);
495             }
496         }
497 
498         return snapshotFilter.getScanner(tsscan, transaction);
499     }
500 
501     /**
502      *
503      * @return array of byte
504      */
505     public byte[] getTableName() {
506         return table.getName().getName();
507     }
508 
509     /**
510      * Delegates to {@link Table#getConfiguration()}
511      *
512      * @return standard configuration object
513      */
514     public Configuration getConfiguration() {
515         return table.getConfiguration();
516     }
517 
518     /**
519      * Delegates to {@link Table#getTableDescriptor()}
520      *
521      * @return HTableDescriptor an instance of HTableDescriptor
522      * @throws IOException if a remote or network exception occurs.
523      */
524     public HTableDescriptor getTableDescriptor() throws IOException {
525         return table.getTableDescriptor();
526     }
527 
528     /**
529      * Transactional version of {@link Table#exists(Get get)}
530      *
531      * @param transaction an instance of transaction to be used
532      * @param get         an instance of Get
533      * @return true if cell exists
534      * @throws IOException if a remote or network exception occurs.
535      */
536     public boolean exists(Transaction transaction, Get get) throws IOException {
537         Result result = get(transaction, get);
538         return !result.isEmpty();
539     }
540 
541     /* TODO What should we do with this methods???
542      * @Override public void batch(Transaction transaction, List<? extends Row>
543      * actions, Object[] results) throws IOException, InterruptedException {}
544      *
545      * @Override public Object[] batch(Transaction transaction, List<? extends
546      * Row> actions) throws IOException, InterruptedException {}
547      *
548      * @Override public <R> void batchCallback(Transaction transaction, List<?
549      * extends Row> actions, Object[] results, Callback<R> callback) throws
550      * IOException, InterruptedException {}
551      *
552      * @Override public <R> Object[] batchCallback(List<? extends Row> actions,
553      * Callback<R> callback) throws IOException, InterruptedException {}
554      */
555 
556     /**
557      * Transactional version of {@link Table#get(List gets)}
558      *
559      * @param transaction an instance of transaction to be used
560      * @param gets        list of Get instances
561      * @return array of Results
562      * @throws IOException if a remote or network exception occurs
563      */
564     public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
565         Result[] results = new Result[gets.size()];
566         int i = 0;
567         for (Get get : gets) {
568             results[i++] = get(transaction, get);
569         }
570         return results;
571     }
572 
573     /**
574      * Transactional version of {@link Table#getScanner(byte[] family)}
575      *
576      * @param transaction an instance of transaction to be used
577      * @param family      column family
578      * @return an instance of ResultScanner
579      * @throws IOException if a remote or network exception occurs
580      */
581     public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
582         Scan scan = new Scan();
583         scan.addFamily(family);
584         return getScanner(transaction, scan);
585     }
586 
587     /**
588      * Transactional version of {@link Table#getScanner(byte[] family, byte[] qualifier)}
589      *
590      * @param transaction an instance of transaction to be used
591      * @param family      column family
592      * @param qualifier   column name
593      * @return an instance of ResultScanner
594      * @throws IOException if a remote or network exception occurs
595      */
596     public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier)
597         throws IOException {
598         Scan scan = new Scan();
599         scan.addColumn(family, qualifier);
600         return getScanner(transaction, scan);
601     }
602 
603     /**
604      * Transactional version of {@link Table#put(List puts)}
605      *
606      * @param transaction an instance of transaction to be used
607      * @param puts        List of puts
608      * @param addShadowCells  denotes whether to add the shadow cell
609      * @throws IOException if a remote or network exception occurs
610      */
611     public void put(Transaction transaction, List<Put> puts, boolean addShadowCells) throws IOException {
612         List<Mutation> mutations = new ArrayList<>(puts.size());
613         for (Put put : puts) {
614             mutations.add(putInternal(transaction, put, addShadowCells));
615         }
616         addMutations(mutations);
617     }
618 
619     public void put(Transaction transaction, List<Put> puts) throws IOException {
620         put(transaction, puts, false);
621     }
622 
623     /**
624      * Transactional version of Table#batch(List rows)
625      *
626      * @param transaction an instance of transaction to be used
627      * @param rows        List of rows that must be instances of Put or Delete
628      * @param addShadowCells  denotes whether to add the shadow cell
629      * @throws IOException if a remote or network exception occurs
630      */
631     public void batch(Transaction transaction, List<? extends Row> rows, boolean addShadowCells) throws IOException {
632         List<Mutation> mutations = new ArrayList<>(rows.size());
633         for (Row row : rows) {
634             if (row instanceof Put) {
635                 mutations.add(putInternal(transaction, (Put)row, addShadowCells));
636             } else if (row instanceof Delete) {
637                 Put deleteP = deleteInternal(transaction, (Delete)row);
638                 if (!deleteP.isEmpty()) {
639                     mutations.add(deleteP);
640                 }
641             } else {
642                 throw new UnsupportedOperationException("Unsupported mutation: " + row);
643             }
644         }
645         addMutations(mutations);
646     }
647 
648     public void batch(Transaction transaction, List<? extends Row> rows) throws IOException {
649         batch(transaction, rows, false);
650     }
651 
652     /**
653      * Transactional version of {@link Table#delete(List deletes)}
654      *
655      * @param transaction an instance of transaction to be used
656      * @param deletes        List of deletes
657      * @throws IOException if a remote or network exception occurs
658      */
659     public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
660         List<Mutation> mutations = new ArrayList<>(deletes.size());
661         for (Delete delete : deletes) {
662             Put deleteP = deleteInternal(transaction, delete);
663             if (!deleteP.isEmpty()) {
664                 mutations.add(deleteP);
665             }
666         }
667         addMutations(mutations);
668     }
669 
670     /**
671      * Provides access to the underliying Table in order to configure it or to perform unsafe (non-transactional)
672      * operations. The latter would break the transactional guarantees of the whole system.
673      *
674      * @return The underlying Table object
675      */
676     public Table getHTable() {
677         return table;
678     }
679 
680     public void setAutoFlush(boolean autoFlush) throws IOException {
681         this.autoFlush = autoFlush;
682         flushCommits();
683     }
684 
685     public boolean isAutoFlush() {
686         return autoFlush;
687     }
688 
689     public void flushCommits() throws IOException {
690         try {
691             if (this.mutations.size() > 0) {
692                 table.batch(this.mutations, new Object[mutations.size()]);
693             }
694         } catch (InterruptedException e) {
695             Thread.interrupted();
696             throw new RuntimeException(e);
697         } finally {
698             this.mutations.clear();
699         }
700     }
701 
702     // ----------------------------------------------------------------------------------------------------------------
703     // Helper methods
704     // ----------------------------------------------------------------------------------------------------------------
705 
706     private void throwExceptionIfOpSetsTimerange(Get getOperation) {
707         TimeRange tr = getOperation.getTimeRange();
708         checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
709     }
710 
711     private void throwExceptionIfOpSetsTimerange(Scan scanOperation) {
712         TimeRange tr = scanOperation.getTimeRange();
713         checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
714     }
715 
716     private void checkTimerangeIsSetToDefaultValuesOrThrowException(TimeRange tr) {
717         if (tr.getMin() != 0L || tr.getMax() != Long.MAX_VALUE) {
718             throw new IllegalArgumentException(
719                 "Timestamp/timerange not allowed in transactional user operations");
720         }
721     }
722 
723     private void throwExceptionIfOpSetsTimerange(Mutation userOperation) {
724         if (userOperation.getTimeStamp() != HConstants.LATEST_TIMESTAMP) {
725             throw new IllegalArgumentException(
726                 "Timestamp not allowed in transactional user operations");
727         }
728     }
729 
730     private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
731         if (tx instanceof HBaseTransaction) {
732             return (HBaseTransaction) tx;
733         } else {
734             throw new IllegalArgumentException(
735                 String.format("The transaction object passed %s is not an instance of HBaseTransaction",
736                               tx.getClass().getName()));
737         }
738     }
739 
740     private HBaseTransactionManager enforceHBaseTransactionManagerAsParam(TransactionManager tm) {
741         if (tm instanceof HBaseTransactionManager) {
742             return (HBaseTransactionManager) tm;
743         } else {
744             throw new IllegalArgumentException(
745                 String.format("The transaction manager object passed %s is not an instance of HBaseTransactionManager ",
746                               tm.getClass().getName()));
747         }
748     }
749 }