1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.Iterators;
23 import com.google.common.collect.PeekingIterator;
24 import org.apache.omid.HBaseShims;
25 import org.apache.omid.committable.CommitTable;
26 import org.apache.omid.committable.CommitTable.Client;
27 import org.apache.omid.committable.CommitTable.CommitTimestamp;
28 import org.apache.omid.transaction.CellUtils;
29 import org.apache.omid.transaction.CellInfo;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
36 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.Queue;
48 import java.util.SortedMap;
49 import java.util.concurrent.ExecutionException;
50
51 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
52
53 public class CompactorScanner implements InternalScanner {
54 private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
55 private final InternalScanner internalScanner;
56 private final CommitTable.Client commitTableClient;
57 private final Queue<CommitTable.Client> commitTableClientQueue;
58 private final boolean isMajorCompaction;
59 private final boolean retainNonTransactionallyDeletedCells;
60 private final long lowWatermark;
61
62 private final Region hRegion;
63
64 private boolean hasMoreRows = false;
65 private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
66
67 public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
68 InternalScanner internalScanner,
69 Client commitTableClient,
70 Queue<CommitTable.Client> commitTableClientQueue,
71 boolean isMajorCompaction,
72 boolean preserveNonTransactionallyDeletedCells) throws IOException {
73 this.internalScanner = internalScanner;
74 this.commitTableClient = commitTableClient;
75 this.commitTableClientQueue = commitTableClientQueue;
76 this.isMajorCompaction = isMajorCompaction;
77 this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
78 this.lowWatermark = getLowWatermarkFromCommitTable();
79
80 this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
81 LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
82 lowWatermark, hRegion.getRegionInfo());
83 }
84
85 @Override
86 public boolean next(List<Cell> results) throws IOException {
87 return next(results, -1);
88 }
89
90 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
91 int limit = scannerContext.getBatchLimit();
92 return next(result, limit);
93 }
94
95 public boolean next(List<Cell> result, int limit) throws IOException {
96
97 if (currentRowWorthValues.isEmpty()) {
98
99 List<Cell> scanResult = new ArrayList<Cell>();
100 hasMoreRows = internalScanner.next(scanResult);
101 if (LOG.isTraceEnabled()) {
102 LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
103 }
104
105
106 SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
107
108
109
110 Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
111 PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
112 = Iterators.peekingIterator(cellToSc.entrySet().iterator());
113 while (iter.hasNext()) {
114 Map.Entry<Cell, Optional<Cell>> entry = iter.next();
115 Cell cell = entry.getKey();
116 Optional<Cell> shadowCellOp = entry.getValue();
117
118 if (cell.getTimestamp() > lowWatermark) {
119 retain(currentRowWorthValues, cell, shadowCellOp);
120 continue;
121 }
122
123 if (shouldRetainNonTransactionallyDeletedCell(cell)) {
124 retain(currentRowWorthValues, cell, shadowCellOp);
125 continue;
126 }
127
128
129
130
131
132
133
134 if (isMajorCompaction) {
135 if (CellUtils.isTombstone(cell)) {
136 if (shadowCellOp.isPresent()) {
137 skipToNextColumn(cell, iter);
138 } else {
139 Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
140
141 if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
142 skipToNextColumn(cell, iter);
143 }
144 }
145 continue;
146 }
147 }
148
149 if (shadowCellOp.isPresent()) {
150 saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
151 } else {
152 Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
153 if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
154
155 byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
156 Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
157 saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
158 } else {
159 LOG.trace("Discarding cell {}", cell);
160 }
161 }
162 }
163 retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
164
165
166 Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
167 }
168
169
170 if (currentRowWorthValues.size() <= limit) {
171 result.addAll(currentRowWorthValues);
172 currentRowWorthValues.clear();
173 } else {
174 result.addAll(currentRowWorthValues.subList(0, limit));
175 currentRowWorthValues.subList(0, limit).clear();
176 }
177 LOG.trace("Results to preserve {}", result);
178
179 return hasMoreRows;
180 }
181
182 @Override
183 public void close() throws IOException {
184 internalScanner.close();
185 commitTableClientQueue.add(commitTableClient);
186 }
187
188
189
190
191
192 @VisibleForTesting
193 public boolean shouldRetainNonTransactionallyDeletedCell(Cell cell) {
194 return (CellUtil.isDelete(cell) || CellUtil.isDeleteFamily(cell))
195 &&
196 retainNonTransactionallyDeletedCells;
197 }
198
199 private void saveLastTimestampedCell(Map<String, CellInfo> lastCells, Cell cell, Cell shadowCell) {
200 String cellKey = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
201 + ":"
202 + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
203 LOG.trace("Cell Key: {}", cellKey);
204
205 if (!lastCells.containsKey(cellKey)) {
206 lastCells.put(cellKey, new CellInfo(cell, shadowCell));
207 } else {
208 if (lastCells.get(cellKey).getTimestamp() < cell.getTimestamp()) {
209 lastCells.put(cellKey, new CellInfo(cell, shadowCell));
210 } else {
211 LOG.trace("Forgetting old cell {}", cell);
212 }
213 }
214 }
215
216 private long getLowWatermarkFromCommitTable() throws IOException {
217 try {
218 LOG.trace("About to read log watermark from commit table");
219 return commitTableClient.readLowWatermark().get();
220 } catch (InterruptedException ie) {
221 Thread.currentThread().interrupt();
222 LOG.warn("Interrupted getting low watermark from commit table", ie);
223 throw new IOException("Interrupted getting low watermark from commit table");
224 } catch (ExecutionException ee) {
225 LOG.warn("Problem getting low watermark from commit table");
226 throw new IOException("Problem getting low watermark from commit table", ee.getCause());
227 }
228 }
229
230 private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
231 try {
232 Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
233 if (ct.isPresent()) {
234 return Optional.of(ct.get());
235 } else {
236 Get g = new Get(CellUtil.cloneRow(cell));
237 byte[] family = CellUtil.cloneFamily(cell);
238 byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
239 cell.getQualifierOffset(),
240 cell.getQualifierLength());
241 g.addColumn(family, qualifier);
242 g.setTimeStamp(cell.getTimestamp());
243 Result r = hRegion.get(g);
244 if (r.containsColumn(family, qualifier)) {
245 return Optional.of(new CommitTimestamp(SHADOW_CELL,
246 Bytes.toLong(r.getValue(family, qualifier)), true));
247 }
248 }
249 } catch (InterruptedException e) {
250 Thread.currentThread().interrupt();
251 throw new IOException("Interrupted while getting commit timestamp from commit table");
252 } catch (ExecutionException e) {
253 throw new IOException("Error getting commit timestamp from commit table", e);
254 }
255
256 return Optional.absent();
257 }
258
259 private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
260 LOG.trace("Retaining cell {}", cell);
261 result.add(cell);
262 if (shadowCell.isPresent()) {
263 LOG.trace("...with shadow cell {}", cell, shadowCell.get());
264 result.add(shadowCell.get());
265 } else {
266 LOG.trace("...without shadow cell! (TS is above Low Watermark)");
267 }
268 }
269
270 private void retainLastTimestampedCellsSaved(List<Cell> result, Map<String, CellInfo> lastTimestampedCellsInRow) {
271 for (CellInfo cellInfo : lastTimestampedCellsInRow.values()) {
272 LOG.trace("Retaining last cell {} with shadow cell {}", cellInfo.getCell(), cellInfo.getShadowCell());
273 result.add(cellInfo.getCell());
274 result.add(cellInfo.getShadowCell());
275 }
276 }
277
278 private void skipToNextColumn(Cell cell, PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter) {
279 while (iter.hasNext()
280 && CellUtil.matchingFamily(iter.peek().getKey(), cell)
281 && CellUtil.matchingQualifier(iter.peek().getKey(), cell)) {
282 iter.next();
283 }
284 }
285
286 }