1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.NavigableMap;
27 import java.util.NavigableSet;
28 import java.util.Set;
29
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.Connection;
39 import org.apache.hadoop.hbase.client.Delete;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.client.Mutation;
42 import org.apache.hadoop.hbase.client.OperationWithAttributes;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.client.ResultScanner;
46 import org.apache.hadoop.hbase.client.Row;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.io.TimeRange;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.omid.committable.CommitTable;
52 import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56
57
58
59
60
61 public class TTable implements Closeable {
62
63 private static Logger LOG = LoggerFactory.getLogger(TTable.class);
64
65 private Table table;
66
67 private SnapshotFilter snapshotFilter;
68
69 private boolean serverSideFilter;
70
71 private final List<Mutation> mutations;
72
73 private boolean autoFlush = true;
74
75 private final boolean conflictFree;
76
77
78
79
80
81 public TTable(Connection connection, byte[] tableName) throws IOException {
82 this(connection.getTable(TableName.valueOf(tableName)), false);
83 }
84
85 public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
86 this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
87 }
88
89 public TTable(Connection connection, String tableName) throws IOException {
90 this(connection.getTable(TableName.valueOf(tableName)), false);
91 }
92
93 public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
94 this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
95 }
96
97 public TTable(Table hTable) throws IOException {
98 this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), false);
99 }
100
101 public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
102 this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
103 }
104
105 public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
106 this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
107 }
108
109 public TTable(Connection connection, String tableName, boolean conflictFree) throws IOException {
110 this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
111 }
112
113 public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
114 this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
115 }
116
117 public TTable(Table hTable, boolean conflictFree) throws IOException {
118 this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), conflictFree);
119 }
120
121 public TTable(Table hTable, SnapshotFilter snapshotFilter) throws IOException {
122 this(hTable, snapshotFilter, false);
123 }
124
125 public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
126 this(hTable, commitTableClient, false);
127 }
128
129 public TTable(Table hTable, boolean serverSideFilter, boolean conflictFree) throws IOException {
130 this.table = hTable;
131 this.conflictFree = conflictFree;
132 this.mutations = new ArrayList<Mutation>();
133 this.serverSideFilter = serverSideFilter;
134 this.snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
135 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable));
136 }
137
138 public TTable(Table hTable, SnapshotFilter snapshotFilter, boolean conflictFree) throws IOException {
139 this.table = hTable;
140 this.conflictFree = conflictFree;
141 this.mutations = new ArrayList<Mutation>();
142 this.snapshotFilter = snapshotFilter;
143 }
144
145 public TTable(Table hTable, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
146 this.table = hTable;
147 this.conflictFree = conflictFree;
148 this.mutations = new ArrayList<Mutation>();
149 this.serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
150 this.snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
151 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable), commitTableClient);
152 }
153
154
155
156
157
158
159
160
161
162
163 @Override
164 public void close() throws IOException {
165 table.close();
166 try {
167 snapshotFilter.close();
168 } catch (Exception e) {
169 LOG.warn("Failed to close TTable resources.");
170 e.printStackTrace();
171 }
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186 public Result get(Transaction tx, final Get get) throws IOException {
187
188 throwExceptionIfOpSetsTimerange(get);
189
190 flushCommits();
191
192 HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
193
194 final long readTimestamp = transaction.getReadTimestamp();
195 final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
196 propagateAttributes(get, tsget);
197 TimeRange timeRange = get.getTimeRange();
198 long startTime = timeRange.getMin();
199 long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
200 tsget.setTimeRange(startTime, endTime).setMaxVersions(1);
201 Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
202 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
203 byte[] family = entry.getKey();
204 NavigableSet<byte[]> qualifiers = entry.getValue();
205 if (qualifiers == null || qualifiers.isEmpty()) {
206 tsget.addFamily(family);
207 } else {
208 for (byte[] qualifier : qualifiers) {
209 tsget.addColumn(family, qualifier);
210 tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
211 }
212 tsget.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER);
213 tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
214 }
215 }
216 LOG.trace("Initial Get = {}", tsget);
217
218 return snapshotFilter.get(tsget, transaction);
219 }
220
221 static private void propagateAttributes(OperationWithAttributes from, OperationWithAttributes to) {
222 Map<String,byte[]> attributeMap = from.getAttributesMap();
223
224 for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
225 to.setAttribute(entry.getKey(), entry.getValue());
226 }
227 }
228
229 private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException {
230 Result result = this.get(tx, deleteG);
231 if (!result.isEmpty()) {
232 for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
233 .entrySet()) {
234 byte[] family = entryF.getKey();
235 for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
236 byte[] qualifier = entryQ.getKey();
237 addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, qualifier,
238 tx.getWriteTimestamp()));
239 }
240 deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
241 CellUtils.DELETE_TOMBSTONE);
242 addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
243 tx.getWriteTimestamp()));
244 }
245 }
246 }
247
248 private void familyQualifierBasedDeletionWithOutRead(HBaseTransaction tx, Put deleteP, Get deleteG) {
249 Set<byte[]> fset = deleteG.getFamilyMap().keySet();
250
251 for (byte[] family : fset) {
252 deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
253 CellUtils.DELETE_TOMBSTONE);
254 addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
255 tx.getWriteTimestamp()));
256
257 }
258 }
259
260
261
262
263
264
265
266
267 public void delete(Transaction tx, Delete delete) throws IOException {
268 Put deleteP = deleteInternal(tx, delete);
269 if (!deleteP.isEmpty()) {
270 addMutation(deleteP);
271 }
272 }
273
274 private Put deleteInternal(Transaction tx, Delete delete) throws IOException {
275
276 throwExceptionIfOpSetsTimerange(delete);
277
278 HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
279
280 final long writeTimestamp = transaction.getWriteTimestamp();
281 boolean deleteFamily = false;
282
283 final Put deleteP = new Put(delete.getRow(), writeTimestamp);
284 final Get deleteG = new Get(delete.getRow());
285 propagateAttributes(delete, deleteP);
286 propagateAttributes(delete, deleteG);
287 Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
288 if (fmap.isEmpty()) {
289 familyQualifierBasedDeletion(transaction, deleteP, deleteG);
290 }
291
292 for (List<Cell> cells : fmap.values()) {
293 for (Cell cell : cells) {
294 CellUtils.validateCell(cell, writeTimestamp);
295 switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
296 case DeleteColumn:
297 deleteP.addColumn(CellUtil.cloneFamily(cell),
298 CellUtil.cloneQualifier(cell),
299 writeTimestamp,
300 CellUtils.DELETE_TOMBSTONE);
301 addWriteSetElement(transaction,
302 new HBaseCellId(this,
303 delete.getRow(),
304 CellUtil.cloneFamily(cell),
305 CellUtil.cloneQualifier(cell),
306 writeTimestamp));
307 break;
308 case DeleteFamily:
309 deleteG.addFamily(CellUtil.cloneFamily(cell));
310 deleteFamily = true;
311 break;
312 case Delete:
313 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
314 deleteP.addColumn(CellUtil.cloneFamily(cell),
315 CellUtil.cloneQualifier(cell),
316 writeTimestamp,
317 CellUtils.DELETE_TOMBSTONE);
318 addWriteSetElement(transaction,
319 new HBaseCellId(this,
320 delete.getRow(),
321 CellUtil.cloneFamily(cell),
322 CellUtil.cloneQualifier(cell),
323 writeTimestamp));
324 break;
325 } else {
326 throw new UnsupportedOperationException(
327 "Cannot delete specific versions on Snapshot Isolation.");
328 }
329 default:
330 break;
331 }
332 }
333 }
334 if (deleteFamily) {
335 if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).
336 getConflictDetectionLevel() == ConflictDetectionLevel.ROW) {
337 familyQualifierBasedDeletionWithOutRead(transaction, deleteP, deleteG);
338 } else {
339 familyQualifierBasedDeletion(transaction, deleteP, deleteG);
340 }
341 }
342
343 return deleteP;
344 }
345
346
347
348
349
350
351
352
353 public void put(Transaction tx, Put put) throws IOException {
354 put(tx, put, false);
355 }
356
357
358
359
360
361
362
363 static public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
364 final Put tsput = new Put(put.getRow(), timestamp);
365 propagateAttributes(put, tsput);
366
367 Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
368 for (List<Cell> kvl : kvs.values()) {
369 for (Cell c : kvl) {
370 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
371 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), timestamp);
372 try {
373 tsput.add(kv);
374 } catch (IOException e) {
375
376
377 throw new RuntimeException(e);
378 }
379 tsput.addColumn(CellUtil.cloneFamily(kv),
380 CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
381 kv.getTimestamp(),
382 Bytes.toBytes(commitTimestamp));
383 }
384 }
385
386 return tsput;
387 }
388
389
390
391
392
393
394
395
396 public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
397 Put tsput = putInternal(tx, put, addShadowCell);
398 addMutation(tsput);
399 }
400
401 private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
402
403 throwExceptionIfOpSetsTimerange(put);
404
405 HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
406
407 final long writeTimestamp = transaction.getWriteTimestamp();
408
409
410 final Put tsput = new Put(put.getRow(), writeTimestamp);
411 propagateAttributes(put, tsput);
412 Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
413 for (List<Cell> kvl : kvs.values()) {
414 for (Cell c : kvl) {
415 CellUtils.validateCell(c, writeTimestamp);
416
417
418
419 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
420 Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
421 tsput.add(kv);
422
423 if (addShadowCell) {
424 tsput.addColumn(CellUtil.cloneFamily(kv),
425 CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
426 kv.getTimestamp(),
427 Bytes.toBytes(kv.getTimestamp()));
428 } else {
429 HBaseCellId cellId = new HBaseCellId(this,
430 CellUtil.cloneRow(kv),
431 CellUtil.cloneFamily(kv),
432 CellUtil.cloneQualifier(kv),
433 kv.getTimestamp());
434
435 addWriteSetElement(transaction, cellId);
436 }
437 }
438 }
439 return tsput;
440 }
441
442 private void addWriteSetElement(HBaseTransaction transaction, HBaseCellId cellId) {
443 if (conflictFree) {
444 transaction.addConflictFreeWriteSetElement(cellId);
445 } else {
446 transaction.addWriteSetElement(cellId);
447 }
448
449 }
450
451 private void addMutation(Mutation m) throws IOException {
452 this.mutations.add(m);
453 if (autoFlush) {
454 flushCommits();
455 }
456 }
457
458 private void addMutations(List<Mutation> mutations) throws IOException {
459 this.mutations.addAll(mutations);
460 if (autoFlush) {
461 flushCommits();
462 }
463 }
464
465
466
467
468
469
470
471
472
473 public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException {
474
475 throwExceptionIfOpSetsTimerange(scan);
476 flushCommits();
477 HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
478
479 Scan tsscan = new Scan(scan);
480 tsscan.setMaxVersions(1);
481 tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1);
482 propagateAttributes(scan, tsscan);
483 Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
484 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
485 byte[] family = entry.getKey();
486 NavigableSet<byte[]> qualifiers = entry.getValue();
487 if (qualifiers == null) {
488 continue;
489 }
490 for (byte[] qualifier : qualifiers) {
491 tsscan.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
492 }
493 if (!qualifiers.isEmpty()) {
494 tsscan.addColumn(entry.getKey(), CellUtils.FAMILY_DELETE_QUALIFIER);
495 }
496 }
497
498 return snapshotFilter.getScanner(tsscan, transaction);
499 }
500
501
502
503
504
505 public byte[] getTableName() {
506 return table.getName().getName();
507 }
508
509
510
511
512
513
514 public Configuration getConfiguration() {
515 return table.getConfiguration();
516 }
517
518
519
520
521
522
523
524 public HTableDescriptor getTableDescriptor() throws IOException {
525 return table.getTableDescriptor();
526 }
527
528
529
530
531
532
533
534
535
536 public boolean exists(Transaction transaction, Get get) throws IOException {
537 Result result = get(transaction, get);
538 return !result.isEmpty();
539 }
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564 public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
565 Result[] results = new Result[gets.size()];
566 int i = 0;
567 for (Get get : gets) {
568 results[i++] = get(transaction, get);
569 }
570 return results;
571 }
572
573
574
575
576
577
578
579
580
581 public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
582 Scan scan = new Scan();
583 scan.addFamily(family);
584 return getScanner(transaction, scan);
585 }
586
587
588
589
590
591
592
593
594
595
596 public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier)
597 throws IOException {
598 Scan scan = new Scan();
599 scan.addColumn(family, qualifier);
600 return getScanner(transaction, scan);
601 }
602
603
604
605
606
607
608
609
610
611 public void put(Transaction transaction, List<Put> puts, boolean addShadowCells) throws IOException {
612 List<Mutation> mutations = new ArrayList<>(puts.size());
613 for (Put put : puts) {
614 mutations.add(putInternal(transaction, put, addShadowCells));
615 }
616 addMutations(mutations);
617 }
618
619 public void put(Transaction transaction, List<Put> puts) throws IOException {
620 put(transaction, puts, false);
621 }
622
623
624
625
626
627
628
629
630
631 public void batch(Transaction transaction, List<? extends Row> rows, boolean addShadowCells) throws IOException {
632 List<Mutation> mutations = new ArrayList<>(rows.size());
633 for (Row row : rows) {
634 if (row instanceof Put) {
635 mutations.add(putInternal(transaction, (Put)row, addShadowCells));
636 } else if (row instanceof Delete) {
637 Put deleteP = deleteInternal(transaction, (Delete)row);
638 if (!deleteP.isEmpty()) {
639 mutations.add(deleteP);
640 }
641 } else {
642 throw new UnsupportedOperationException("Unsupported mutation: " + row);
643 }
644 }
645 addMutations(mutations);
646 }
647
648 public void batch(Transaction transaction, List<? extends Row> rows) throws IOException {
649 batch(transaction, rows, false);
650 }
651
652
653
654
655
656
657
658
659 public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
660 List<Mutation> mutations = new ArrayList<>(deletes.size());
661 for (Delete delete : deletes) {
662 Put deleteP = deleteInternal(transaction, delete);
663 if (!deleteP.isEmpty()) {
664 mutations.add(deleteP);
665 }
666 }
667 addMutations(mutations);
668 }
669
670
671
672
673
674
675
676 public Table getHTable() {
677 return table;
678 }
679
680 public void setAutoFlush(boolean autoFlush) throws IOException {
681 this.autoFlush = autoFlush;
682 flushCommits();
683 }
684
685 public boolean isAutoFlush() {
686 return autoFlush;
687 }
688
689 public void flushCommits() throws IOException {
690 try {
691 if (this.mutations.size() > 0) {
692 table.batch(this.mutations, new Object[mutations.size()]);
693 }
694 } catch (InterruptedException e) {
695 Thread.interrupted();
696 throw new RuntimeException(e);
697 } finally {
698 this.mutations.clear();
699 }
700 }
701
702
703
704
705
706 private void throwExceptionIfOpSetsTimerange(Get getOperation) {
707 TimeRange tr = getOperation.getTimeRange();
708 checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
709 }
710
711 private void throwExceptionIfOpSetsTimerange(Scan scanOperation) {
712 TimeRange tr = scanOperation.getTimeRange();
713 checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
714 }
715
716 private void checkTimerangeIsSetToDefaultValuesOrThrowException(TimeRange tr) {
717 if (tr.getMin() != 0L || tr.getMax() != Long.MAX_VALUE) {
718 throw new IllegalArgumentException(
719 "Timestamp/timerange not allowed in transactional user operations");
720 }
721 }
722
723 private void throwExceptionIfOpSetsTimerange(Mutation userOperation) {
724 if (userOperation.getTimeStamp() != HConstants.LATEST_TIMESTAMP) {
725 throw new IllegalArgumentException(
726 "Timestamp not allowed in transactional user operations");
727 }
728 }
729
730 private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
731 if (tx instanceof HBaseTransaction) {
732 return (HBaseTransaction) tx;
733 } else {
734 throw new IllegalArgumentException(
735 String.format("The transaction object passed %s is not an instance of HBaseTransaction",
736 tx.getClass().getName()));
737 }
738 }
739
740 private HBaseTransactionManager enforceHBaseTransactionManagerAsParam(TransactionManager tm) {
741 if (tm instanceof HBaseTransactionManager) {
742 return (HBaseTransactionManager) tm;
743 } else {
744 throw new IllegalArgumentException(
745 String.format("The transaction manager object passed %s is not an instance of HBaseTransactionManager ",
746 tm.getClass().getName()));
747 }
748 }
749 }