1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21
22
23 import org.apache.commons.collections4.map.LRUMap;
24 import org.apache.hadoop.hbase.Cell;
25 import org.apache.hadoop.hbase.CellUtil;
26 import org.apache.hadoop.hbase.client.Get;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.filter.Filter;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.omid.OmidFilterBase;
33
34
35 import java.io.IOException;
36 import java.util.HashMap;
37
38 import java.util.List;
39 import java.util.Map;
40
41
42 public class TransactionVisibilityFilterBase extends OmidFilterBase {
43
44
45 private final Filter userFilter;
46 private final SnapshotFilterImpl snapshotFilter;
47 private final LRUMap<Long ,Long> commitCache;
48 private final HBaseTransaction hbaseTransaction;
49
50
51
52 private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
53
54 public TransactionVisibilityFilterBase(Filter cellFilter,
55 SnapshotFilterImpl snapshotFilter,
56 HBaseTransaction hbaseTransaction) {
57 this.userFilter = cellFilter;
58 this.snapshotFilter = snapshotFilter;
59 commitCache = new LRUMap<>(1000);
60 this.hbaseTransaction = hbaseTransaction;
61 familyDeletionCache = new HashMap<>();
62
63 }
64
65 @Override
66 public ReturnCode filterKeyValue(Cell v) throws IOException {
67 if (CellUtils.isShadowCell(v)) {
68 Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
69 commitCache.put(v.getTimestamp(), commitTs);
70
71 if (hbaseTransaction.getStartTimestamp() >= commitTs) {
72 return ReturnCode.NEXT_COL;
73 } else {
74 return ReturnCode.SKIP;
75 }
76 }
77
78 Optional<Long> commitTS = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
79 if (commitTS.isPresent()) {
80 if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
81 snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
82 return runUserFilter(v, ReturnCode.INCLUDE);
83 }
84 if (CellUtils.isFamilyDeleteCell(v)) {
85 familyDeletionCache.put(createImmutableBytesWritable(v), commitTS.get());
86 if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
87 return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
88 } else {
89 return ReturnCode.NEXT_COL;
90 }
91 }
92 Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v));
93 if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
94 if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
95 return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
96 } else {
97 return ReturnCode.NEXT_COL;
98 }
99 }
100 if (CellUtils.isTombstone(v)) {
101 if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
102 return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
103 } else {
104 return ReturnCode.NEXT_COL;
105 }
106 }
107
108 return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
109 }
110
111 return ReturnCode.SKIP;
112 }
113
114
115 private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
116 return new ImmutableBytesWritable(v.getFamilyArray(),
117 v.getFamilyOffset(),v.getFamilyLength());
118 }
119
120 private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
121 throws IOException {
122 assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE);
123 if (userFilter == null) {
124 return snapshotReturn;
125 }
126
127 ReturnCode userRes = userFilter.filterKeyValue(v);
128 switch (userRes) {
129 case INCLUDE:
130 return snapshotReturn;
131 case SKIP:
132 return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
133 default:
134 return userRes;
135 }
136
137 }
138
139
140 private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
141 Long cachedCommitTS = commitCache.get(v.getTimestamp());
142 if (cachedCommitTS != null) {
143 if (hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
144 return Optional.of(cachedCommitTS);
145 } else {
146 return Optional.absent();
147 }
148 }
149 if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
150 return Optional.of(v.getTimestamp());
151 }
152
153 if (getShadowCellBeforeCT) {
154
155
156 final Get get = new Get(CellUtil.cloneRow(v));
157 get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
158 get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
159 Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get);
160
161 if (!shadowCell.isEmpty()) {
162 long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
163 commitCache.put(v.getTimestamp(), commitTS);
164 if (commitTS <= hbaseTransaction.getStartTimestamp()) {
165 return Optional.of(commitTS);
166 }
167 }
168 }
169
170 Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
171 if (commitTS.isPresent()) {
172 commitCache.put(v.getTimestamp(), commitTS.get());
173 } else {
174 commitCache.put(v.getTimestamp(), Long.MAX_VALUE);
175 }
176 return commitTS;
177 }
178
179
180 @Override
181 public void reset() throws IOException {
182 familyDeletionCache.clear();
183 if (userFilter != null) {
184 userFilter.reset();
185 }
186 }
187
188 @Override
189 public boolean filterRow() throws IOException {
190 if (userFilter != null) {
191 return userFilter.filterRow();
192 }
193 return super.filterRow();
194 }
195
196
197 @Override
198 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
199 if (userFilter != null) {
200 return userFilter.filterRowKey(buffer, offset, length);
201 }
202 return super.filterRowKey(buffer, offset, length);
203 }
204
205 @Override
206 public boolean filterAllRemaining() throws IOException {
207 if (userFilter != null) {
208 return userFilter.filterAllRemaining();
209 }
210 return super.filterAllRemaining();
211 }
212
213 @Override
214 public void filterRowCells(List<Cell> kvs) throws IOException {
215 if (userFilter != null) {
216 userFilter.filterRowCells(kvs);
217 } else {
218 super.filterRowCells(kvs);
219 }
220 }
221
222 @Override
223 public boolean hasFilterRow() {
224 if (userFilter != null) {
225 return userFilter.hasFilterRow();
226 }
227 return super.hasFilterRow();
228 }
229
230 @Override
231 public Cell getNextCellHint(Cell currentKV) throws IOException {
232 if (userFilter != null) {
233 return userFilter.getNextCellHint(currentKV);
234 }
235 return super.getNextCellHint(currentKV);
236 }
237
238 @Override
239 public boolean isFamilyEssential(byte[] name) throws IOException {
240 if (userFilter != null) {
241 return userFilter.isFamilyEssential(name);
242 }
243 return super.isFamilyEssential(name);
244 }
245
246 @Override
247 public byte[] toByteArray() throws IOException {
248 return super.toByteArray();
249 }
250
251 public Filter getInnerFilter() {
252 return userFilter;
253 }
254 }