View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  
22  
23  import org.apache.commons.collections4.map.LRUMap;
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellUtil;
26  import org.apache.hadoop.hbase.client.Get;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.filter.Filter;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.omid.OmidFilterBase;
33  
34  
35  import java.io.IOException;
36  import java.util.HashMap;
37  
38  import java.util.List;
39  import java.util.Map;
40  
41  
42  public class TransactionVisibilityFilterBase extends OmidFilterBase {
43  
44      // optional sub-filter to apply to visible cells
45      private final Filter userFilter;
46      private final SnapshotFilterImpl snapshotFilter;
47      private final LRUMap<Long ,Long> commitCache;
48      private final HBaseTransaction hbaseTransaction;
49  
50      // This cache is cleared when moving to the next row
51      // So no need to keep row name
52      private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
53  
54      public TransactionVisibilityFilterBase(Filter cellFilter,
55                                             SnapshotFilterImpl snapshotFilter,
56                                             HBaseTransaction hbaseTransaction) {
57          this.userFilter = cellFilter;
58          this.snapshotFilter = snapshotFilter;
59          commitCache = new LRUMap<>(1000);
60          this.hbaseTransaction = hbaseTransaction;
61          familyDeletionCache = new HashMap<>();
62  
63      }
64  
65      @Override
66      public ReturnCode filterKeyValue(Cell v) throws IOException {
67          if (CellUtils.isShadowCell(v)) {
68              Long commitTs =  Bytes.toLong(CellUtil.cloneValue(v));
69              commitCache.put(v.getTimestamp(), commitTs);
70              // Continue getting shadow cells until one of them fits this transaction
71              if (hbaseTransaction.getStartTimestamp() >= commitTs) {
72                  return ReturnCode.NEXT_COL;
73              } else {
74                  return ReturnCode.SKIP;
75              }
76          }
77  
78          Optional<Long> commitTS = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
79          if (commitTS.isPresent()) {
80              if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
81                      snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
82                  return runUserFilter(v, ReturnCode.INCLUDE);
83              }
84              if (CellUtils.isFamilyDeleteCell(v)) {
85                  familyDeletionCache.put(createImmutableBytesWritable(v), commitTS.get());
86                  if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
87                      return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
88                  } else {
89                      return ReturnCode.NEXT_COL;
90                  }
91              }
92              Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v));
93              if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
94                  if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
95                      return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
96                  } else {
97                      return ReturnCode.NEXT_COL;
98                  }
99              }
100             if (CellUtils.isTombstone(v)) {
101                 if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
102                     return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
103                 } else {
104                     return ReturnCode.NEXT_COL;
105                 }
106             }
107 
108             return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
109         }
110 
111         return ReturnCode.SKIP;
112     }
113 
114 
115     private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
116         return new ImmutableBytesWritable(v.getFamilyArray(),
117                 v.getFamilyOffset(),v.getFamilyLength());
118     }
119 
120     private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
121             throws IOException {
122         assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE);
123         if (userFilter == null) {
124             return snapshotReturn;
125         }
126 
127         ReturnCode userRes = userFilter.filterKeyValue(v);
128         switch (userRes) {
129             case INCLUDE:
130                 return snapshotReturn;
131             case SKIP:
132                 return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
133             default:
134                 return userRes;
135         }
136 
137     }
138 
139     // For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
140     private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
141         Long cachedCommitTS = commitCache.get(v.getTimestamp());
142         if (cachedCommitTS != null) {
143             if (hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
144                 return Optional.of(cachedCommitTS);
145             } else {
146                 return Optional.absent();
147             }
148         }
149         if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
150             return Optional.of(v.getTimestamp());
151         }
152 
153         if (getShadowCellBeforeCT) {
154 
155             // Try to get shadow cell from region
156             final Get get = new Get(CellUtil.cloneRow(v));
157             get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
158             get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
159             Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get);
160 
161             if (!shadowCell.isEmpty()) {
162                 long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
163                 commitCache.put(v.getTimestamp(), commitTS);
164                 if (commitTS <= hbaseTransaction.getStartTimestamp()) {
165                     return Optional.of(commitTS);
166                 }
167             }
168         }
169 
170         Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
171         if (commitTS.isPresent()) {
172             commitCache.put(v.getTimestamp(), commitTS.get());
173         } else {
174             commitCache.put(v.getTimestamp(), Long.MAX_VALUE);
175         }
176         return commitTS;
177     }
178 
179 
180     @Override
181     public void reset() throws IOException {
182         familyDeletionCache.clear();
183         if (userFilter != null) {
184             userFilter.reset();
185         }
186     }
187 
188     @Override
189     public boolean filterRow() throws IOException {
190         if (userFilter != null) {
191             return userFilter.filterRow();
192         }
193         return super.filterRow();
194     }
195 
196 
197     @Override
198     public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
199         if (userFilter != null) {
200             return userFilter.filterRowKey(buffer, offset, length);
201         }
202         return super.filterRowKey(buffer, offset, length);
203     }
204 
205     @Override
206     public boolean filterAllRemaining() throws IOException {
207         if (userFilter != null) {
208             return userFilter.filterAllRemaining();
209         }
210         return super.filterAllRemaining();
211     }
212 
213     @Override
214     public void filterRowCells(List<Cell> kvs) throws IOException {
215         if (userFilter != null) {
216             userFilter.filterRowCells(kvs);
217         } else {
218             super.filterRowCells(kvs);
219         }
220     }
221 
222     @Override
223     public boolean hasFilterRow() {
224         if (userFilter != null) {
225             return userFilter.hasFilterRow();
226         }
227         return super.hasFilterRow();
228     }
229 
230     @Override
231     public Cell getNextCellHint(Cell currentKV) throws IOException {
232         if (userFilter != null) {
233             return userFilter.getNextCellHint(currentKV);
234         }
235         return super.getNextCellHint(currentKV);
236     }
237 
238     @Override
239     public boolean isFamilyEssential(byte[] name) throws IOException {
240         if (userFilter != null) {
241             return userFilter.isFamilyEssential(name);
242         }
243         return super.isFamilyEssential(name);
244     }
245 
246     @Override
247     public byte[] toByteArray() throws IOException {
248         return super.toByteArray();
249     }
250 
251     public Filter getInnerFilter() {
252         return userFilter;
253     }
254 }