View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.base.Function;
21  import com.google.common.base.Optional;
22  import com.google.common.base.Predicate;
23  import com.google.common.collect.ImmutableList;
24  import com.google.common.collect.Iterables;
25  import com.google.common.collect.Multimaps;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.client.Delete;
35  import org.apache.hadoop.hbase.client.Get;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.HTableInterface;
38  import org.apache.hadoop.hbase.client.Mutation;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.io.TimeRange;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.omid.committable.CommitTable.CommitTimestamp;
46  import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  
50  import java.io.Closeable;
51  import java.io.IOException;
52  import java.util.ArrayList;
53  import java.util.Collection;
54  import java.util.Collections;
55  import java.util.HashMap;
56  import java.util.Iterator;
57  import java.util.List;
58  import java.util.Map;
59  import java.util.Map.Entry;
60  import java.util.NavigableMap;
61  import java.util.NavigableSet;
62  
63  /**
64   * Provides transactional methods for accessing and modifying a given snapshot
65   * of data identified by an opaque {@link Transaction} object. It mimics the
66   * behavior in {@link org.apache.hadoop.hbase.client.HTableInterface}
67   */
68  public class TTable implements Closeable {
69  
70      private static Logger LOG = LoggerFactory.getLogger(TTable.class);
71  
72      private final HTableInterface healerTable;
73  
74      private HTableInterface table;
75  
76      // ----------------------------------------------------------------------------------------------------------------
77      // Construction
78      // ----------------------------------------------------------------------------------------------------------------
79  
80      public TTable(Configuration conf, byte[] tableName) throws IOException {
81          this(new HTable(conf, tableName));
82      }
83  
84      public TTable(String tableName) throws IOException {
85          this(HBaseConfiguration.create(), Bytes.toBytes(tableName));
86      }
87  
88      public TTable(Configuration conf, String tableName) throws IOException {
89          this(conf, Bytes.toBytes(tableName));
90      }
91  
92      public TTable(HTableInterface hTable) throws IOException {
93          table = hTable;
94          healerTable = new HTable(table.getConfiguration(), table.getTableName());
95      }
96  
97      public TTable(HTableInterface hTable, HTableInterface healerTable) throws IOException {
98          table = hTable;
99          this.healerTable = healerTable;
100     }
101 
102     // ----------------------------------------------------------------------------------------------------------------
103     // Closeable implementation
104     // ----------------------------------------------------------------------------------------------------------------
105 
106     /**
107      * Releases any resources held or pending changes in internal buffers.
108      *
109      * @throws IOException
110      *             if a remote or network exception occurs.
111      */
112     @Override
113     public void close() throws IOException {
114         table.close();
115         healerTable.close();
116     }
117 
118     // ----------------------------------------------------------------------------------------------------------------
119     // Transactional operations
120     // ----------------------------------------------------------------------------------------------------------------
121 
122     /**
123      * Transactional version of {@link HTableInterface#get(Get get)}
124      */
125     public Result get(Transaction tx, final Get get) throws IOException {
126 
127         throwExceptionIfOpSetsTimerange(get);
128 
129         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
130 
131         final long readTimestamp = transaction.getStartTimestamp();
132         final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
133         TimeRange timeRange = get.getTimeRange();
134         long startTime = timeRange.getMin();
135         long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
136         tsget.setTimeRange(startTime, endTime).setMaxVersions(1);
137         Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
138         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
139             byte[] family = entry.getKey();
140             NavigableSet<byte[]> qualifiers = entry.getValue();
141             if (qualifiers == null || qualifiers.isEmpty()) {
142                 tsget.addFamily(family);
143             } else {
144                 for (byte[] qualifier : qualifiers) {
145                     tsget.addColumn(family, qualifier);
146                     tsget.addColumn(family, CellUtils.addShadowCellSuffix(qualifier));
147                 }
148             }
149         }
150         LOG.trace("Initial Get = {}", tsget);
151 
152         // Return the KVs that belong to the transaction snapshot, ask for more
153         // versions if needed
154         Result result = table.get(tsget);
155         List<Cell> filteredKeyValues = Collections.emptyList();
156         if (!result.isEmpty()) {
157             filteredKeyValues = filterCellsForSnapshot(result.listCells(), transaction, tsget.getMaxVersions());
158         }
159 
160         return Result.create(filteredKeyValues);
161     }
162 
163     /**
164      * Transactional version of {@link HTableInterface#delete(Delete delete)}
165      */
166     public void delete(Transaction tx, Delete delete) throws IOException {
167 
168         throwExceptionIfOpSetsTimerange(delete);
169 
170         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
171 
172         final long startTimestamp = transaction.getStartTimestamp();
173         boolean issueGet = false;
174 
175         final Put deleteP = new Put(delete.getRow(), startTimestamp);
176         final Get deleteG = new Get(delete.getRow());
177         Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
178         if (fmap.isEmpty()) {
179             issueGet = true;
180         }
181         for (List<Cell> cells : fmap.values()) {
182             for (Cell cell : cells) {
183                 CellUtils.validateCell(cell, startTimestamp);
184                 switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
185                     case DeleteColumn:
186                         deleteP.add(CellUtil.cloneFamily(cell),
187                                     CellUtil.cloneQualifier(cell),
188                                     startTimestamp,
189                                     CellUtils.DELETE_TOMBSTONE);
190                         transaction.addWriteSetElement(
191                                 new HBaseCellId(table,
192                                                 delete.getRow(),
193                                                 CellUtil.cloneFamily(cell),
194                                                 CellUtil.cloneQualifier(cell),
195                                                 cell.getTimestamp()));
196                         break;
197                     case DeleteFamily:
198                         deleteG.addFamily(CellUtil.cloneFamily(cell));
199                         issueGet = true;
200                         break;
201                     case Delete:
202                         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
203                             deleteP.add(CellUtil.cloneFamily(cell),
204                                         CellUtil.cloneQualifier(cell),
205                                         startTimestamp,
206                                         CellUtils.DELETE_TOMBSTONE);
207                             transaction.addWriteSetElement(
208                                     new HBaseCellId(table,
209                                                     delete.getRow(),
210                                                     CellUtil.cloneFamily(cell),
211                                                     CellUtil.cloneQualifier(cell),
212                                                     cell.getTimestamp()));
213                             break;
214                         } else {
215                             throw new UnsupportedOperationException(
216                                     "Cannot delete specific versions on Snapshot Isolation.");
217                         }
218                     default:
219                         break;
220                 }
221             }
222         }
223         if (issueGet) {
224             // It's better to perform a transactional get to avoid deleting more
225             // than necessary
226             Result result = this.get(transaction, deleteG);
227             if (!result.isEmpty()) {
228                 for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
229                         .entrySet()) {
230                     byte[] family = entryF.getKey();
231                     for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
232                         byte[] qualifier = entryQ.getKey();
233                         deleteP.add(family, qualifier, CellUtils.DELETE_TOMBSTONE);
234                         transaction.addWriteSetElement(new HBaseCellId(table, delete.getRow(), family, qualifier, transaction.getStartTimestamp()));
235                     }
236                 }
237             }
238         }
239 
240         if (!deleteP.isEmpty()) {
241             table.put(deleteP);
242         }
243 
244     }
245 
246     /**
247      * Transactional version of {@link HTableInterface#put(Put put)}
248      */
249     public void put(Transaction tx, Put put) throws IOException {
250 
251         throwExceptionIfOpSetsTimerange(put);
252 
253         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
254 
255         final long startTimestamp = transaction.getStartTimestamp();
256         // create put with correct ts
257         final Put tsput = new Put(put.getRow(), startTimestamp);
258         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
259         for (List<Cell> kvl : kvs.values()) {
260             for (Cell c : kvl) {
261                 CellUtils.validateCell(c, startTimestamp);
262                 // Reach into keyvalue to update timestamp.
263                 // It's not nice to reach into keyvalue internals,
264                 // but we want to avoid having to copy the whole thing
265                 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
266                 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), startTimestamp);
267                 tsput.add(kv);
268 
269                 transaction.addWriteSetElement(
270                         new HBaseCellId(table,
271                                         CellUtil.cloneRow(kv),
272                                         CellUtil.cloneFamily(kv),
273                                         CellUtil.cloneQualifier(kv),
274                                         kv.getTimestamp()));
275             }
276         }
277 
278         table.put(tsput);
279     }
280 
281     /**
282      * Transactional version of {@link HTableInterface#getScanner(Scan scan)}
283      */
284     public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException {
285 
286         throwExceptionIfOpSetsTimerange(scan);
287 
288         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
289 
290         Scan tsscan = new Scan(scan);
291         tsscan.setMaxVersions(1);
292         tsscan.setTimeRange(0, transaction.getStartTimestamp() + 1);
293         Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
294         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
295             byte[] family = entry.getKey();
296             NavigableSet<byte[]> qualifiers = entry.getValue();
297             if (qualifiers == null) {
298                 continue;
299             }
300             for (byte[] qualifier : qualifiers) {
301                 tsscan.addColumn(family, CellUtils.addShadowCellSuffix(qualifier));
302             }
303         }
304         return new TransactionalClientScanner(transaction, tsscan, 1);
305     }
306 
307     /**
308      * Filters the raw results returned from HBase and returns only those
309      * belonging to the current snapshot, as defined by the transaction
310      * object. If the raw results don't contain enough information for a
311      * particular qualifier, it will request more versions from HBase.
312      *
313      * @param rawCells
314      *            Raw cells that we are going to filter
315      * @param transaction
316      *            Defines the current snapshot
317      * @param versionsToRequest
318      *            Number of versions requested from hbase
319      * @return Filtered KVs belonging to the transaction snapshot
320      * @throws IOException
321      */
322     List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
323                                       int versionsToRequest) throws IOException {
324 
325         assert (rawCells != null && transaction != null && versionsToRequest >= 1);
326 
327         List<Cell> keyValuesInSnapshot = new ArrayList<>();
328         List<Get> pendingGetsList = new ArrayList<>();
329 
330         int numberOfVersionsToFetch = versionsToRequest * 2;
331         if (numberOfVersionsToFetch < 1) {
332             numberOfVersionsToFetch = versionsToRequest;
333         }
334 
335         Map<Long, Long> commitCache = buildCommitCache(rawCells);
336 
337         for (Collection<Cell> columnCells : groupCellsByColumnFilteringShadowCells(rawCells)) {
338             boolean snapshotValueFound = false;
339             Cell oldestCell = null;
340             for (Cell cell : columnCells) {
341                 if (isCellInSnapshot(cell, transaction, commitCache)) {
342                     if (!CellUtil.matchingValue(cell, CellUtils.DELETE_TOMBSTONE)) {
343                         keyValuesInSnapshot.add(cell);
344                     }
345                     snapshotValueFound = true;
346                     break;
347                 }
348                 oldestCell = cell;
349             }
350             if (!snapshotValueFound) {
351                 assert (oldestCell != null);
352                 Get pendingGet = createPendingGet(oldestCell, numberOfVersionsToFetch);
353                 pendingGetsList.add(pendingGet);
354             }
355         }
356 
357         if (!pendingGetsList.isEmpty()) {
358             Result[] pendingGetsResults = table.get(pendingGetsList);
359             for (Result pendingGetResult : pendingGetsResults) {
360                 if (!pendingGetResult.isEmpty()) {
361                     keyValuesInSnapshot.addAll(
362                             filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch));
363                 }
364             }
365         }
366 
367         Collections.sort(keyValuesInSnapshot, KeyValue.COMPARATOR);
368 
369         assert (keyValuesInSnapshot.size() <= rawCells.size());
370         return keyValuesInSnapshot;
371     }
372 
373     private Map<Long, Long> buildCommitCache(List<Cell> rawCells) {
374 
375         Map<Long, Long> commitCache = new HashMap<>();
376 
377         for (Cell cell : rawCells) {
378             if (CellUtils.isShadowCell(cell)) {
379                 commitCache.put(cell.getTimestamp(), Bytes.toLong(CellUtil.cloneValue(cell)));
380             }
381         }
382 
383         return commitCache;
384     }
385 
386     private boolean isCellInSnapshot(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
387             throws IOException {
388 
389         long startTimestamp = transaction.getStartTimestamp();
390 
391         if (kv.getTimestamp() == startTimestamp) {
392             return true;
393         }
394 
395         Optional<Long> commitTimestamp =
396                 tryToLocateCellCommitTimestamp(transaction.getTransactionManager(), transaction.getEpoch(), kv, commitCache);
397 
398         return commitTimestamp.isPresent() && commitTimestamp.get() < startTimestamp;
399     }
400 
401     private Get createPendingGet(Cell cell, int versionCount) throws IOException {
402 
403         Get pendingGet = new Get(CellUtil.cloneRow(cell));
404         pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
405         pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtils.addShadowCellSuffix(cell.getQualifierArray(),
406                                                                                        cell.getQualifierOffset(),
407                                                                                        cell.getQualifierLength()));
408         pendingGet.setMaxVersions(versionCount);
409         pendingGet.setTimeRange(0, cell.getTimestamp());
410 
411         return pendingGet;
412     }
413 
414     private Optional<Long> tryToLocateCellCommitTimestamp(AbstractTransactionManager transactionManager,
415                                                           long epoch,
416                                                           Cell cell,
417                                                           Map<Long, Long> commitCache)
418             throws IOException {
419 
420         CommitTimestamp tentativeCommitTimestamp =
421                 transactionManager.locateCellCommitTimestamp(
422                         cell.getTimestamp(),
423                         epoch,
424                         new CommitTimestampLocatorImpl(
425                                 new HBaseCellId(table,
426                                                 CellUtil.cloneRow(cell),
427                                                 CellUtil.cloneFamily(cell),
428                                                 CellUtil.cloneQualifier(cell),
429                                                 cell.getTimestamp()),
430                                 commitCache));
431 
432         // If transaction that added the cell was invalidated
433         if (!tentativeCommitTimestamp.isValid()) {
434             return Optional.absent();
435         }
436 
437         switch (tentativeCommitTimestamp.getLocation()) {
438             case COMMIT_TABLE:
439                 // If the commit timestamp is found in the persisted commit table,
440                 // that means the writing process of the shadow cell in the post
441                 // commit phase of the client probably failed, so we heal the shadow
442                 // cell with the right commit timestamp for avoiding further reads to
443                 // hit the storage
444                 healShadowCell(cell, tentativeCommitTimestamp.getValue());
445                 return Optional.of(tentativeCommitTimestamp.getValue());
446             case CACHE:
447             case SHADOW_CELL:
448                 return Optional.of(tentativeCommitTimestamp.getValue());
449             case NOT_PRESENT:
450                 return Optional.absent();
451             default:
452                 assert (false);
453                 return Optional.absent();
454         }
455     }
456 
457     void healShadowCell(Cell cell, long commitTimestamp) {
458         Put put = new Put(CellUtil.cloneRow(cell));
459         byte[] family = CellUtil.cloneFamily(cell);
460         byte[] shadowCellQualifier = CellUtils.addShadowCellSuffix(cell.getQualifierArray(),
461                                                                    cell.getQualifierOffset(),
462                                                                    cell.getQualifierLength());
463         put.add(family, shadowCellQualifier, cell.getTimestamp(), Bytes.toBytes(commitTimestamp));
464         try {
465             healerTable.put(put);
466         } catch (IOException e) {
467             LOG.warn("Failed healing shadow cell for kv {}", cell, e);
468         }
469     }
470 
471     protected class TransactionalClientScanner implements ResultScanner {
472         private HBaseTransaction state;
473         private ResultScanner innerScanner;
474         private int maxVersions;
475 
476         TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions)
477                 throws IOException {
478             this.state = state;
479             this.innerScanner = table.getScanner(scan);
480             this.maxVersions = maxVersions;
481         }
482 
483 
484         @Override
485         public Result next() throws IOException {
486             List<Cell> filteredResult = Collections.emptyList();
487             while (filteredResult.isEmpty()) {
488                 Result result = innerScanner.next();
489                 if (result == null) {
490                     return null;
491                 }
492                 if (!result.isEmpty()) {
493                     filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions);
494                 }
495             }
496             return Result.create(filteredResult);
497         }
498 
499         // In principle no need to override, copied from super.next(int) to make
500         // sure it works even if super.next(int)
501         // changes its implementation
502         @Override
503         public Result[] next(int nbRows) throws IOException {
504             // Collect values to be returned here
505             ArrayList<Result> resultSets = new ArrayList<>(nbRows);
506             for (int i = 0; i < nbRows; i++) {
507                 Result next = next();
508                 if (next != null) {
509                     resultSets.add(next);
510                 } else {
511                     break;
512                 }
513             }
514             return resultSets.toArray(new Result[resultSets.size()]);
515         }
516 
517         @Override
518         public void close() {
519             innerScanner.close();
520         }
521 
522         @Override
523         public Iterator<Result> iterator() {
524             return new ResultIterator(this);
525         }
526 
527         // ------------------------------------------------------------------------------------------------------------
528         // --------------------------------- Helper class for TransactionalClientScanner ------------------------------
529         // ------------------------------------------------------------------------------------------------------------
530 
531         class ResultIterator implements Iterator<Result> {
532 
533             TransactionalClientScanner scanner;
534             Result currentResult;
535 
536             ResultIterator(TransactionalClientScanner scanner) {
537                 try {
538                     this.scanner = scanner;
539                     currentResult = scanner.next();
540                 } catch (IOException e) {
541                     throw new RuntimeException(e);
542                 }
543             }
544 
545             @Override
546             public boolean hasNext() {
547                 return currentResult != null && !currentResult.isEmpty();
548             }
549 
550             @Override
551             public Result next() {
552                 try {
553                     Result result = currentResult;
554                     currentResult = scanner.next();
555                     return result;
556                 } catch (IOException e) {
557                     throw new RuntimeException(e);
558                 }
559             }
560 
561             @Override
562             public void remove() {
563                 throw new RuntimeException("Not implemented");
564             }
565 
566         }
567 
568     }
569 
570     /**
571      * Delegates to {@link HTable#getTableName()}
572      */
573     public byte[] getTableName() {
574         return table.getTableName();
575     }
576 
577     /**
578      * Delegates to {@link HTable#getConfiguration()}
579      */
580     public Configuration getConfiguration() {
581         return table.getConfiguration();
582     }
583 
584     /**
585      * Delegates to {@link HTable#getTableDescriptor()}
586      */
587     public HTableDescriptor getTableDescriptor() throws IOException {
588         return table.getTableDescriptor();
589     }
590 
591     /**
592      * Transactional version of {@link HTableInterface#exists(Get get)}
593      */
594     public boolean exists(Transaction transaction, Get get) throws IOException {
595         Result result = get(transaction, get);
596         return !result.isEmpty();
597     }
598 
599     /* TODO What should we do with this methods???
600      * @Override public void batch(Transaction transaction, List<? extends Row>
601      * actions, Object[] results) throws IOException, InterruptedException {}
602      *
603      * @Override public Object[] batch(Transaction transaction, List<? extends
604      * Row> actions) throws IOException, InterruptedException {}
605      *
606      * @Override public <R> void batchCallback(Transaction transaction, List<?
607      * extends Row> actions, Object[] results, Callback<R> callback) throws
608      * IOException, InterruptedException {}
609      *
610      * @Override public <R> Object[] batchCallback(List<? extends Row> actions,
611      * Callback<R> callback) throws IOException, InterruptedException {}
612      */
613 
614     /**
615      * Transactional version of {@link HTableInterface#get(List<Get> gets)}
616      */
617     public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
618         Result[] results = new Result[gets.size()];
619         int i = 0;
620         for (Get get : gets) {
621             results[i++] = get(transaction, get);
622         }
623         return results;
624     }
625 
626     /**
627      * Transactional version of {@link HTableInterface#getScanner(byte[] family)}
628      */
629     public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
630         Scan scan = new Scan();
631         scan.addFamily(family);
632         return getScanner(transaction, scan);
633     }
634 
635     /**
636      * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)}
637      */
638     public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier)
639             throws IOException {
640         Scan scan = new Scan();
641         scan.addColumn(family, qualifier);
642         return getScanner(transaction, scan);
643     }
644 
645     /**
646      * Transactional version of {@link HTableInterface#put(List<Put> puts)}
647      */
648     public void put(Transaction transaction, List<Put> puts) throws IOException {
649         for (Put put : puts) {
650             put(transaction, put);
651         }
652     }
653 
654     /**
655      * Transactional version of {@link HTableInterface#delete(List<Delete> deletes)}
656      */
657     public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
658         for (Delete delete : deletes) {
659             delete(transaction, delete);
660         }
661     }
662 
663     /**
664      * Provides access to the underliying HTable in order to configure it or to
665      * perform unsafe (non-transactional) operations. The latter would break the
666      * transactional guarantees of the whole system.
667      *
668      * @return The underlying HTable object
669      */
670     public HTableInterface getHTable() {
671         return table;
672     }
673 
674     /**
675      * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
676      */
677     public void setAutoFlush(boolean autoFlush) {
678         table.setAutoFlush(autoFlush, true);
679     }
680 
681     /**
682      * Delegates to {@link HTable#isAutoFlush()}
683      */
684     public boolean isAutoFlush() {
685         return table.isAutoFlush();
686     }
687 
688     /**
689      * Delegates to {@link HTable.getWriteBufferSize()}
690      */
691     public long getWriteBufferSize() {
692         return table.getWriteBufferSize();
693     }
694 
695     /**
696      * Delegates to {@link HTable.setWriteBufferSize()}
697      */
698     public void setWriteBufferSize(long writeBufferSize) throws IOException {
699         table.setWriteBufferSize(writeBufferSize);
700     }
701 
702     /**
703      * Delegates to {@link HTable.flushCommits()}
704      */
705     public void flushCommits() throws IOException {
706         table.flushCommits();
707     }
708 
709     // ----------------------------------------------------------------------------------------------------------------
710     // Helper methods
711     // ----------------------------------------------------------------------------------------------------------------
712 
713     private void throwExceptionIfOpSetsTimerange(Get getOperation) {
714         TimeRange tr = getOperation.getTimeRange();
715         checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
716     }
717 
718     private void throwExceptionIfOpSetsTimerange(Scan scanOperation) {
719         TimeRange tr = scanOperation.getTimeRange();
720         checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
721     }
722 
723     private void checkTimerangeIsSetToDefaultValuesOrThrowException(TimeRange tr) {
724         if (tr.getMin() != 0L || tr.getMax() != Long.MAX_VALUE) {
725             throw new IllegalArgumentException(
726                     "Timestamp/timerange not allowed in transactional user operations");
727         }
728     }
729 
730     private void throwExceptionIfOpSetsTimerange(Mutation userOperation) {
731         if (userOperation.getTimeStamp() != HConstants.LATEST_TIMESTAMP) {
732             throw new IllegalArgumentException(
733                     "Timestamp not allowed in transactional user operations");
734         }
735     }
736 
737     private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
738         if (tx instanceof HBaseTransaction) {
739             return (HBaseTransaction) tx;
740         } else {
741             throw new IllegalArgumentException(
742                     String.format("The transaction object passed %s is not an instance of HBaseTransaction",
743                                   tx.getClass().getName()));
744         }
745     }
746 
747     static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCells(List<Cell> rawCells) {
748 
749         Predicate<Cell> shadowCellFilter = new Predicate<Cell>() {
750 
751             @Override
752             public boolean apply(Cell cell) {
753                 return cell != null && !CellUtils.isShadowCell(cell);
754             }
755 
756         };
757 
758         Function<Cell, ColumnWrapper> cellToColumnWrapper = new Function<Cell, ColumnWrapper>() {
759 
760             @Override
761             public ColumnWrapper apply(Cell cell) {
762                 return new ColumnWrapper(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
763             }
764 
765         };
766 
767         return Multimaps.index(Iterables.filter(rawCells, shadowCellFilter), cellToColumnWrapper)
768                 .asMap().values()
769                 .asList();
770     }
771 }