View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.committable.hbase.HBaseCommitTable;
23  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.CoprocessorEnvironment;
26  import org.apache.hadoop.hbase.HColumnDescriptor;
27  import org.apache.hadoop.hbase.HTableDescriptor;
28  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
29  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
30  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
31  import org.apache.hadoop.hbase.regionserver.CompactorScanner;
32  import org.apache.hadoop.hbase.regionserver.InternalScanner;
33  import org.apache.hadoop.hbase.regionserver.ScanType;
34  import org.apache.hadoop.hbase.regionserver.Store;
35  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import java.io.IOException;
41  import java.util.Queue;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  
44  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
45  
46  /**
47   * Garbage collector for stale data: triggered upon HBase
48   * compactions, it removes data from uncommitted transactions
49   * older than the low watermark using a special scanner
50   */
51  public class OmidCompactor extends BaseRegionObserver {
52  
53      private static final Logger LOG = LoggerFactory.getLogger(OmidCompactor.class);
54  
55      private static final String HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY
56              = "omid.hbase.compactor.retain.tombstones";
57      private static final boolean HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT = true;
58  
59      final static String OMID_COMPACTABLE_CF_FLAG = "OMID_ENABLED";
60  
61      private HBaseCommitTableConfig commitTableConf = null;
62      private Configuration conf = null;
63      @VisibleForTesting
64      Queue<CommitTable.Client> commitTableClientQueue = new ConcurrentLinkedQueue<>();
65  
66      // When compacting, if a cell which has been marked by HBase as Delete or
67      // Delete Family (that is, non-transactionally deleted), we allow the user
68      // to decide what the compactor scanner should do with it: retain it or not
69      // If retained, the deleted cell will appear after a minor compaction, but
70      // will be deleted anyways after a major one
71      private boolean retainNonTransactionallyDeletedCells;
72  
73      public OmidCompactor() {
74          LOG.info("Compactor coprocessor initialized via empty constructor");
75      }
76  
77      @Override
78      public void start(CoprocessorEnvironment env) throws IOException {
79          LOG.info("Starting compactor coprocessor");
80          conf = env.getConfiguration();
81          commitTableConf = new HBaseCommitTableConfig();
82          String commitTableName = conf.get(COMMIT_TABLE_NAME_KEY);
83          if (commitTableName != null) {
84              commitTableConf.setTableName(commitTableName);
85          }
86          retainNonTransactionallyDeletedCells =
87                  conf.getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
88                          HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
89          LOG.info("Compactor coprocessor started");
90      }
91  
92      @Override
93      public void stop(CoprocessorEnvironment e) throws IOException {
94          LOG.info("Stopping compactor coprocessor");
95          if (commitTableClientQueue != null) {
96              for (CommitTable.Client commitTableClient : commitTableClientQueue) {
97                  commitTableClient.close();
98              }
99          }
100         LOG.info("Compactor coprocessor stopped");
101     }
102 
103     @Override
104     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
105                                       Store store,
106                                       InternalScanner scanner,
107                                       ScanType scanType,
108                                       CompactionRequest request) throws IOException {
109         HTableDescriptor desc = e.getEnvironment().getRegion().getTableDesc();
110         HColumnDescriptor famDesc
111                 = desc.getFamily(Bytes.toBytes(store.getColumnFamilyName()));
112         boolean omidCompactable = Boolean.valueOf(famDesc.getValue(OMID_COMPACTABLE_CF_FLAG));
113         // only column families tagged as compactable are compacted
114         // with omid compactor
115         if (!omidCompactable) {
116             return scanner;
117         } else {
118             CommitTable.Client commitTableClient = commitTableClientQueue.poll();
119             if (commitTableClient == null) {
120                 commitTableClient = initAndGetCommitTableClient();
121             }
122             boolean isMajorCompaction = request.isMajor();
123             return new CompactorScanner(e,
124                     scanner,
125                     commitTableClient,
126                     commitTableClientQueue,
127                     isMajorCompaction,
128                     retainNonTransactionallyDeletedCells);
129         }
130     }
131 
132     private CommitTable.Client initAndGetCommitTableClient() throws IOException {
133         LOG.info("Trying to get the commit table client");
134         CommitTable commitTable = new HBaseCommitTable(conf, commitTableConf);
135         CommitTable.Client commitTableClient = commitTable.getClient();
136         LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
137         return commitTableClient;
138     }
139 
140 }