View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.base.Optional;
22  import com.google.common.collect.Iterators;
23  import com.google.common.collect.PeekingIterator;
24  import org.apache.omid.HBaseShims;
25  import org.apache.omid.committable.CommitTable;
26  import org.apache.omid.committable.CommitTable.Client;
27  import org.apache.omid.committable.CommitTable.CommitTimestamp;
28  import org.apache.omid.transaction.CellUtils;
29  import org.apache.omid.transaction.CellInfo;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.client.Get;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
36  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import java.io.IOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.HashMap;
45  import java.util.List;
46  import java.util.Map;
47  import java.util.Queue;
48  import java.util.SortedMap;
49  import java.util.concurrent.ExecutionException;
50  
51  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
52  
53  public class CompactorScanner implements InternalScanner {
54      private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
55      private final InternalScanner internalScanner;
56      private final CommitTable.Client commitTableClient;
57      private final Queue<CommitTable.Client> commitTableClientQueue;
58      private final boolean isMajorCompaction;
59      private final boolean retainNonTransactionallyDeletedCells;
60      private final long lowWatermark;
61  
62      private final Region hRegion;
63  
64      private boolean hasMoreRows = false;
65      private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
66  
67      public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
68                              InternalScanner internalScanner,
69                              Client commitTableClient,
70                              Queue<CommitTable.Client> commitTableClientQueue,
71                              boolean isMajorCompaction,
72                              boolean preserveNonTransactionallyDeletedCells) throws IOException {
73          this.internalScanner = internalScanner;
74          this.commitTableClient = commitTableClient;
75          this.commitTableClientQueue = commitTableClientQueue;
76          this.isMajorCompaction = isMajorCompaction;
77          this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
78          this.lowWatermark = getLowWatermarkFromCommitTable();
79          // Obtain the table in which the scanner is going to operate
80          this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
81          LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
82                  lowWatermark, hRegion.getRegionInfo());
83      }
84  
85      @Override
86      public boolean next(List<Cell> results) throws IOException {
87          return next(results, -1);
88      }
89  
90      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
91          int limit = scannerContext.getBatchLimit();
92          return next(result, limit);
93      }
94  
95      public boolean next(List<Cell> result, int limit) throws IOException {
96  
97          if (currentRowWorthValues.isEmpty()) {
98              // 1) Read next row
99              List<Cell> scanResult = new ArrayList<Cell>();
100             hasMoreRows = internalScanner.next(scanResult);
101             if (LOG.isTraceEnabled()) {
102                 LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
103             }
104             // 2) Traverse result list separating normal cells from shadow
105             // cells and building a map to access easily the shadow cells.
106             SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
107 
108             // 3) traverse the list of row key values isolated before and
109             // check which ones should be discarded
110             Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
111             PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
112                     = Iterators.peekingIterator(cellToSc.entrySet().iterator());
113             while (iter.hasNext()) {
114                 Map.Entry<Cell, Optional<Cell>> entry = iter.next();
115                 Cell cell = entry.getKey();
116                 Optional<Cell> shadowCellOp = entry.getValue();
117 
118                 if (cell.getTimestamp() > lowWatermark) {
119                     retain(currentRowWorthValues, cell, shadowCellOp);
120                     continue;
121                 }
122 
123                 if (shouldRetainNonTransactionallyDeletedCell(cell)) {
124                     retain(currentRowWorthValues, cell, shadowCellOp);
125                     continue;
126                 }
127 
128                 // During a minor compaction the coprocessor may only see a
129                 // subset of store files and may not have the all the versions
130                 // of a cell available for consideration. Therefore, if it
131                 // deletes a cell with a tombstone during a minor compaction,
132                 // an older version of the cell may become visible again. So,
133                 // we have to remove tombstones only in major compactions.
134                 if (isMajorCompaction) {
135                     if (CellUtils.isTombstone(cell)) {
136                         if (shadowCellOp.isPresent()) {
137                             skipToNextColumn(cell, iter);
138                         } else {
139                             Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
140                             // Clean the cell only if it is valid
141                             if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
142                                 skipToNextColumn(cell, iter);
143                             }
144                         }
145                         continue;
146                     }
147                 }
148 
149                 if (shadowCellOp.isPresent()) {
150                     saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
151                 } else {
152                     Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
153                     if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
154                         // Build the missing shadow cell...
155                         byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
156                         Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
157                         saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
158                     } else {
159                         LOG.trace("Discarding cell {}", cell);
160                     }
161                 }
162             }
163             retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
164 
165             // 4) Sort the list
166             Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
167         }
168 
169         // Chomp current row worth values up to the limit
170         if (currentRowWorthValues.size() <= limit) {
171             result.addAll(currentRowWorthValues);
172             currentRowWorthValues.clear();
173         } else {
174             result.addAll(currentRowWorthValues.subList(0, limit));
175             currentRowWorthValues.subList(0, limit).clear();
176         }
177         LOG.trace("Results to preserve {}", result);
178 
179         return hasMoreRows;
180     }
181 
182     @Override
183     public void close() throws IOException {
184         internalScanner.close();
185         commitTableClientQueue.add(commitTableClient);
186     }
187 
188     // ----------------------------------------------------------------------------------------------------------------
189     // Helper methods
190     // ----------------------------------------------------------------------------------------------------------------
191 
192     @VisibleForTesting
193     public boolean shouldRetainNonTransactionallyDeletedCell(Cell cell) {
194         return (CellUtil.isDelete(cell) || CellUtil.isDeleteFamily(cell))
195                 &&
196                 retainNonTransactionallyDeletedCells;
197     }
198 
199     private void saveLastTimestampedCell(Map<String, CellInfo> lastCells, Cell cell, Cell shadowCell) {
200         String cellKey = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
201                 + ":"
202                 + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
203         LOG.trace("Cell Key: {}", cellKey);
204 
205         if (!lastCells.containsKey(cellKey)) {
206             lastCells.put(cellKey, new CellInfo(cell, shadowCell));
207         } else {
208             if (lastCells.get(cellKey).getTimestamp() < cell.getTimestamp()) {
209                 lastCells.put(cellKey, new CellInfo(cell, shadowCell));
210             } else {
211                 LOG.trace("Forgetting old cell {}", cell);
212             }
213         }
214     }
215 
216     private long getLowWatermarkFromCommitTable() throws IOException {
217         try {
218             LOG.trace("About to read log watermark from commit table");
219             return commitTableClient.readLowWatermark().get();
220         } catch (InterruptedException ie) {
221             Thread.currentThread().interrupt();
222             LOG.warn("Interrupted getting low watermark from commit table", ie);
223             throw new IOException("Interrupted getting low watermark from commit table");
224         } catch (ExecutionException ee) {
225             LOG.warn("Problem getting low watermark from commit table");
226             throw new IOException("Problem getting low watermark from commit table", ee.getCause());
227         }
228     }
229 
230     private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
231         try {
232             Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
233             if (ct.isPresent()) {
234                 return Optional.of(ct.get());
235             } else {
236                 Get g = new Get(CellUtil.cloneRow(cell));
237                 byte[] family = CellUtil.cloneFamily(cell);
238                 byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
239                         cell.getQualifierOffset(),
240                         cell.getQualifierLength());
241                 g.addColumn(family, qualifier);
242                 g.setTimeStamp(cell.getTimestamp());
243                 Result r = hRegion.get(g);
244                 if (r.containsColumn(family, qualifier)) {
245                     return Optional.of(new CommitTimestamp(SHADOW_CELL,
246                             Bytes.toLong(r.getValue(family, qualifier)), true));
247                 }
248             }
249         } catch (InterruptedException e) {
250             Thread.currentThread().interrupt();
251             throw new IOException("Interrupted while getting commit timestamp from commit table");
252         } catch (ExecutionException e) {
253             throw new IOException("Error getting commit timestamp from commit table", e);
254         }
255 
256         return Optional.absent();
257     }
258 
259     private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
260         LOG.trace("Retaining cell {}", cell);
261         result.add(cell);
262         if (shadowCell.isPresent()) {
263             LOG.trace("...with shadow cell {}", cell, shadowCell.get());
264             result.add(shadowCell.get());
265         } else {
266             LOG.trace("...without shadow cell! (TS is above Low Watermark)");
267         }
268     }
269 
270     private void retainLastTimestampedCellsSaved(List<Cell> result, Map<String, CellInfo> lastTimestampedCellsInRow) {
271         for (CellInfo cellInfo : lastTimestampedCellsInRow.values()) {
272             LOG.trace("Retaining last cell {} with shadow cell {}", cellInfo.getCell(), cellInfo.getShadowCell());
273             result.add(cellInfo.getCell());
274             result.add(cellInfo.getShadowCell());
275         }
276     }
277 
278     private void skipToNextColumn(Cell cell, PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter) {
279         while (iter.hasNext()
280                 && CellUtil.matchingFamily(iter.peek().getKey(), cell)
281                 && CellUtil.matchingQualifier(iter.peek().getKey(), cell)) {
282             iter.next();
283         }
284     }
285 
286 }