1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22 import org.apache.phoenix.thirdparty.com.google.common.collect.Iterators;
23 import org.apache.phoenix.thirdparty.com.google.common.collect.PeekingIterator;
24 import org.apache.commons.collections4.map.LRUMap;
25 import org.apache.omid.HBaseShims;
26 import org.apache.omid.committable.CommitTable;
27 import org.apache.omid.committable.CommitTable.Client;
28 import org.apache.omid.committable.CommitTable.CommitTimestamp;
29 import org.apache.omid.transaction.CellUtils;
30 import org.apache.omid.transaction.CellInfo;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.client.Get;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
37 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import java.io.IOException;
43 import java.util.ArrayList;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48
49 import java.util.SortedMap;
50 import java.util.concurrent.ExecutionException;
51
52 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
53
54 public class CompactorScanner implements InternalScanner {
55 private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
56 private final InternalScanner internalScanner;
57 private final CommitTable.Client commitTableClient;
58
59 private final boolean isMajorCompaction;
60 private final boolean retainNonTransactionallyDeletedCells;
61 private final long lowWatermark;
62
63 private final Region hRegion;
64
65 private boolean hasMoreRows = false;
66 private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
67 private final LRUMap<Long ,Optional<CommitTimestamp>> commitCache;
68
69 public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
70 InternalScanner internalScanner,
71 Client commitTableClient,
72 boolean isMajorCompaction,
73 boolean preserveNonTransactionallyDeletedCells) throws IOException {
74 this.internalScanner = internalScanner;
75 this.commitTableClient = commitTableClient;
76 this.isMajorCompaction = isMajorCompaction;
77 this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
78 this.lowWatermark = getLowWatermarkFromCommitTable();
79
80 this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
81 commitCache = new LRUMap<>(1000);
82 LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
83 lowWatermark, hRegion.getRegionInfo());
84 }
85
86 @Override
87 public boolean next(List<Cell> results) throws IOException {
88 return next(results, -1);
89 }
90
91 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
92 int limit = scannerContext.getBatchLimit();
93 return next(result, limit);
94 }
95
96 public boolean next(List<Cell> result, int limit) throws IOException {
97
98 if (currentRowWorthValues.isEmpty()) {
99
100 List<Cell> scanResult = new ArrayList<Cell>();
101 hasMoreRows = internalScanner.next(scanResult);
102 if (LOG.isTraceEnabled()) {
103 LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
104 }
105
106
107 SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
108
109
110
111 Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
112 PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
113 = Iterators.peekingIterator(cellToSc.entrySet().iterator());
114 while (iter.hasNext()) {
115 Map.Entry<Cell, Optional<Cell>> entry = iter.next();
116 Cell cell = entry.getKey();
117 Optional<Cell> shadowCellOp = entry.getValue();
118
119 if (cell.getTimestamp() > lowWatermark) {
120 retain(currentRowWorthValues, cell, shadowCellOp);
121 continue;
122 }
123
124 if (shouldRetainNonTransactionallyDeletedCell(cell)) {
125 retain(currentRowWorthValues, cell, shadowCellOp);
126 continue;
127 }
128
129
130
131
132
133
134
135 if (isMajorCompaction) {
136
137 if (CellUtils.isTombstone(cell)) {
138 if (shadowCellOp.isPresent()) {
139 skipToNextColumn(cell, iter);
140 } else {
141 Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
142
143 if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
144 skipToNextColumn(cell, iter);
145 }
146 }
147 continue;
148 }
149 }
150
151 if (shadowCellOp.isPresent()) {
152 saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
153 } else {
154 Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
155 if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
156
157 byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
158 Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
159 saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
160 } else {
161 LOG.trace("Discarding cell {}", cell);
162 }
163 }
164 }
165 retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
166
167
168 Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
169 }
170
171
172 if (currentRowWorthValues.size() <= limit || limit == -1) {
173 result.addAll(currentRowWorthValues);
174 currentRowWorthValues.clear();
175 } else {
176 result.addAll(currentRowWorthValues.subList(0, limit));
177 currentRowWorthValues.subList(0, limit).clear();
178 }
179 LOG.trace("Results to preserve {}", result);
180
181 return hasMoreRows;
182 }
183
184 @Override
185 public void close() throws IOException {
186 internalScanner.close();
187 }
188
189
190
191
192
193 @VisibleForTesting
194 public boolean shouldRetainNonTransactionallyDeletedCell(Cell cell) {
195 return (CellUtil.isDelete(cell) || CellUtil.isDeleteFamily(cell))
196 &&
197 retainNonTransactionallyDeletedCells;
198 }
199
200 private void saveLastTimestampedCell(Map<String, CellInfo> lastCells, Cell cell, Cell shadowCell) {
201 String cellKey = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
202 + ":"
203 + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
204 LOG.trace("Cell Key: {}", cellKey);
205
206 if (!lastCells.containsKey(cellKey)) {
207 lastCells.put(cellKey, new CellInfo(cell, shadowCell));
208 } else {
209 if (lastCells.get(cellKey).getTimestamp() < cell.getTimestamp()) {
210 lastCells.put(cellKey, new CellInfo(cell, shadowCell));
211 } else {
212 LOG.trace("Forgetting old cell {}", cell);
213 }
214 }
215 }
216
217 private long getLowWatermarkFromCommitTable() throws IOException {
218 try {
219 LOG.trace("About to read log watermark from commit table");
220 return commitTableClient.readLowWatermark().get();
221 } catch (InterruptedException ie) {
222 Thread.currentThread().interrupt();
223 LOG.warn("Interrupted getting low watermark from commit table", ie);
224 throw new IOException("Interrupted getting low watermark from commit table");
225 } catch (ExecutionException ee) {
226 LOG.warn("Problem getting low watermark from commit table");
227 throw new IOException("Problem getting low watermark from commit table", ee.getCause());
228 }
229 }
230
231
232 private Result getShadowCell(byte[] row, byte[] family, byte[] qualifier, long timestamp) throws IOException {
233 Get g = new Get(row);
234 g.addColumn(family, qualifier);
235 g.setTimeStamp(timestamp);
236 Result r = hRegion.get(g);
237 return r;
238 }
239
240
241 private Optional<CommitTimestamp> getCommitTimestampWithRaces(Cell cell) throws IOException {
242 try {
243 byte[] family = CellUtil.cloneFamily(cell);
244 byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
245 cell.getQualifierOffset(),
246 cell.getQualifierLength());
247
248 Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
249 if (ct.isPresent()) {
250 if (ct.get().isValid()) {
251 return Optional.of(ct.get());
252 }
253
254 }
255
256
257 Result r = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
258 if (r.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
259 Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
260 Bytes.toLong(r.getValue(family, qualifier)), true));
261 return retval;
262 }
263
264
265
266 Boolean invalidated = commitTableClient.tryInvalidateTransaction(cell.getTimestamp()).get();
267 if (invalidated) {
268
269
270 Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
271 if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
272 Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
273 Bytes.toLong(r2.getValue(family, qualifier)), true));
274 commitTableClient.deleteCommitEntry(cell.getTimestamp());
275 return retval;
276 }
277 return Optional.absent();
278 }
279
280
281 Optional<CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
282 if (ct2.isPresent()) {
283 return Optional.of(ct2.get());
284 }
285
286
287 Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
288 if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
289 Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
290 Bytes.toLong(r2.getValue(family, qualifier)), true));
291 return retval;
292 }
293
294 } catch (InterruptedException e) {
295 Thread.currentThread().interrupt();
296 throw new IOException("Interrupted while getting commit timestamp from commit table");
297 } catch (ExecutionException e) {
298 throw new IOException("Error getting commit timestamp from commit table", e);
299 }
300
301 return Optional.absent();
302 }
303
304 private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
305
306
307 Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
308 if (cachedValue != null) {
309 return cachedValue;
310 }
311 Optional<CommitTimestamp> value = getCommitTimestampWithRaces(cell);
312 commitCache.put(cell.getTimestamp(), value);
313 return value;
314 }
315
316 private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
317 LOG.trace("Retaining cell {}", cell);
318 result.add(cell);
319 if (shadowCell.isPresent()) {
320 LOG.trace("...with shadow cell {}", cell, shadowCell.get());
321 result.add(shadowCell.get());
322 } else {
323 LOG.trace("...without shadow cell! (TS is above Low Watermark)");
324 }
325 }
326
327 private void retainLastTimestampedCellsSaved(List<Cell> result, Map<String, CellInfo> lastTimestampedCellsInRow) {
328 for (CellInfo cellInfo : lastTimestampedCellsInRow.values()) {
329 LOG.trace("Retaining last cell {} with shadow cell {}", cellInfo.getCell(), cellInfo.getShadowCell());
330 result.add(cellInfo.getCell());
331 result.add(cellInfo.getShadowCell());
332 }
333 }
334
335 private void skipToNextColumn(Cell cell, PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter) {
336 boolean isFamilyDelete = CellUtils.isFamilyDeleteCell(cell);
337 while (iter.hasNext()
338 && CellUtil.matchingFamily(iter.peek().getKey(), cell)
339 && (CellUtil.matchingQualifier(iter.peek().getKey(), cell) || isFamilyDelete)) {
340 iter.next();
341 }
342 }
343
344 }