View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  
22  
23  import org.apache.omid.committable.CommitTable;
24  import org.apache.omid.committable.hbase.HBaseCommitTable;
25  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
26  import org.apache.hadoop.hbase.CoprocessorEnvironment;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.omid.HBaseShims;
29  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
30  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
31  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
32  import org.apache.hadoop.hbase.regionserver.CompactorScanner;
33  import org.apache.hadoop.hbase.regionserver.InternalScanner;
34  import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
35  import org.apache.hadoop.hbase.regionserver.ScanType;
36  import org.apache.hadoop.hbase.regionserver.Store;
37  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
38  
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import java.io.IOException;
43  import java.util.Queue;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  
46  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
47  
48  /**
49   * Garbage collector for stale data: triggered upon HBase
50   * compactions, it removes data from uncommitted transactions
51   * older than the low watermark using a special scanner
52   */
53  public class OmidCompactor extends BaseRegionObserver {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(OmidCompactor.class);
56  
57      private static final String HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY
58              = "omid.hbase.compactor.retain.tombstones";
59      private static final boolean HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT = true;
60  
61      final static String OMID_COMPACTABLE_CF_FLAG = "OMID_ENABLED";
62  
63      private boolean enableCompactorForAllFamilies = false;
64  
65      private HBaseCommitTableConfig commitTableConf = null;
66      private RegionCoprocessorEnvironment env = null;
67      @VisibleForTesting
68      Queue<CommitTable.Client> commitTableClientQueue = new ConcurrentLinkedQueue<>();
69  
70      // When compacting, if a cell which has been marked by HBase as Delete or
71      // Delete Family (that is, non-transactionally deleted), we allow the user
72      // to decide what the compactor scanner should do with it: retain it or not
73      // If retained, the deleted cell will appear after a minor compaction, but
74      // will be deleted anyways after a major one
75      private boolean retainNonTransactionallyDeletedCells;
76  
77      public OmidCompactor() {
78          this(false);
79      }
80  
81      public OmidCompactor(boolean enableCompactorForAllFamilies) {
82          LOG.info("Compactor coprocessor initialized");
83          this.enableCompactorForAllFamilies = enableCompactorForAllFamilies;
84      }
85  
86      @Override
87      public void start(CoprocessorEnvironment env) throws IOException {
88          LOG.info("Starting compactor coprocessor");
89          this.env = (RegionCoprocessorEnvironment) env;
90          commitTableConf = new HBaseCommitTableConfig();
91          String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
92          if (commitTableName != null) {
93              commitTableConf.setTableName(commitTableName);
94          }
95          retainNonTransactionallyDeletedCells =
96                  env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
97                          HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
98          LOG.info("Compactor coprocessor started");
99      }
100 
101     @Override
102     public void stop(CoprocessorEnvironment e) throws IOException {
103         LOG.info("Stopping compactor coprocessor");
104         if (commitTableClientQueue != null) {
105             for (CommitTable.Client commitTableClient : commitTableClientQueue) {
106                 commitTableClient.close();
107             }
108         }
109         LOG.info("Compactor coprocessor stopped");
110     }
111 
112 
113 
114     @Override
115     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
116                                       Store store,
117                                       InternalScanner scanner,
118                                       ScanType scanType,
119                                       CompactionRequest request) throws IOException {
120         boolean omidCompactable;
121         try {
122             if (enableCompactorForAllFamilies) {
123                 omidCompactable = true;
124             } else {
125 
126                 omidCompactable = HBaseShims.OmidCompactionEnabled(env, store, OMID_COMPACTABLE_CF_FLAG);
127             }
128 
129             // only column families tagged as compactable are compacted
130             // with omid compactor
131             if (!omidCompactable) {
132                 return scanner;
133             } else {
134                 CommitTable.Client commitTableClient = commitTableClientQueue.poll();
135                 if (commitTableClient == null) {
136                     commitTableClient = initAndGetCommitTableClient();
137                 }
138                 boolean isMajorCompaction = request.isMajor();
139                 return new CompactorScanner(env,
140                         scanner,
141                         commitTableClient,
142                         commitTableClientQueue,
143                         isMajorCompaction,
144                         retainNonTransactionallyDeletedCells);
145             }
146         } catch (IOException e) {
147             throw e;
148         } catch (Exception e) {
149             throw new DoNotRetryIOException(e);
150         }
151     }
152 
153     private CommitTable.Client initAndGetCommitTableClient() throws IOException {
154         LOG.info("Trying to get the commit table client");
155         CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, env), commitTableConf);
156         CommitTable.Client commitTableClient = commitTable.getClient();
157         LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
158         return commitTableClient;
159     }
160 
161 }