View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import com.google.protobuf.InvalidProtocolBufferException;
21  
22  import org.apache.hadoop.hbase.client.Connection;
23  import org.apache.hadoop.hbase.client.Scan;
24  
25  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
26  import org.apache.hadoop.hbase.filter.Filter;
27  import org.apache.hadoop.hbase.regionserver.RegionScanner;
28  
29  
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.omid.committable.CommitTable;
32  import org.apache.omid.committable.hbase.HBaseCommitTable;
33  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
34  import org.apache.omid.proto.TSOProto;
35  import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
36  import org.apache.omid.HBaseShims;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CoprocessorEnvironment;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
42  import org.apache.hadoop.hbase.regionserver.RegionAccessWrapper;
43  import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  import java.io.IOException;
48  
49  import java.util.HashSet;
50  import java.util.List;
51  import java.util.Map;
52  import java.util.Queue;
53  import java.util.concurrent.ConcurrentHashMap;
54  import java.util.concurrent.ConcurrentLinkedQueue;
55  
56  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
57  
58  /**
59   * Server side filtering to identify the transaction snapshot.
60   */
61  public class OmidSnapshotFilter extends BaseRegionObserver {
62  
63      private static final Logger LOG = LoggerFactory.getLogger(OmidSnapshotFilter.class);
64  
65      private HBaseCommitTableConfig commitTableConf = null;
66      private RegionCoprocessorEnvironment env = null;
67      private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
68      private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
69      private CommitTable.Client inMemoryCommitTable = null;
70      private CommitTable.Client commitTableClient;
71      private Connection connection;
72  
73      public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
74          LOG.info("Compactor coprocessor initialized");
75          this.inMemoryCommitTable = commitTableClient;
76      }
77  
78      public OmidSnapshotFilter() {
79          LOG.info("Compactor coprocessor initialized via empty constructor");
80      }
81  
82      @Override
83      public void start(CoprocessorEnvironment env) throws IOException {
84          LOG.info("Starting snapshot filter coprocessor");
85          this.env = (RegionCoprocessorEnvironment)env;
86          commitTableConf = new HBaseCommitTableConfig();
87          String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
88          if (commitTableName != null) {
89              commitTableConf.setTableName(commitTableName);
90          }
91          connection = RegionConnectionFactory
92                  .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env);
93          commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
94          LOG.info("Snapshot filter started");
95      }
96  
97      @Override
98      public void stop(CoprocessorEnvironment e) throws IOException {
99          LOG.info("stopping Snapshot filter");
100         LOG.info("Snapshot filter stopped");
101     }
102 
103 
104     // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
105     public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) {
106         SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(get);
107         if (snapshotFilter != null) {
108             snapshotFilterQueue.add(snapshotFilter);
109         }
110     }
111 
112 
113     // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
114     public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
115             throws IOException {
116 
117         if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return;
118         boolean isLowLatency = Bytes.toBoolean(get.getAttribute(CellUtils.LL_ATTRIBUTE));
119         HBaseTransaction hbaseTransaction = getHBaseTransaction(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE),
120                 isLowLatency);
121         SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
122         snapshotFilterMap.put(get, snapshotFilter);
123 
124         get.setMaxVersions();
125         Filter newFilter = TransactionFilters.getVisibilityFilter(get.getFilter(),
126                 snapshotFilter, hbaseTransaction);
127         get.setFilter(newFilter);
128     }
129 
130     private SnapshotFilterImpl getSnapshotFilter(ObserverContext<RegionCoprocessorEnvironment> e)
131             throws IOException {
132         SnapshotFilterImpl snapshotFilter= snapshotFilterQueue.poll();
133         if (snapshotFilter == null) {
134             RegionAccessWrapper regionAccessWrapper =
135                     new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(e.getEnvironment()));
136             snapshotFilter = new SnapshotFilterImpl(regionAccessWrapper, initAndGetCommitTableClient());
137         }
138         return snapshotFilter;
139     }
140 
141 
142     // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
143     public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
144                                         Scan scan,
145                                         RegionScanner s) throws IOException {
146         preScannerOpen(e,scan);
147         return s;
148     }
149 
150 
151     // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
152     public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
153                                Scan scan) throws IOException {
154         byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
155 
156         if (byteTransaction == null) {
157             return;
158         }
159         boolean isLowLatency = Bytes.toBoolean(scan.getAttribute(CellUtils.LL_ATTRIBUTE));
160         HBaseTransaction hbaseTransaction = getHBaseTransaction(byteTransaction, isLowLatency);
161         SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
162 
163         scan.setMaxVersions();
164         Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(),
165                 snapshotFilter, hbaseTransaction);
166         scan.setFilter(newFilter);
167         return;
168     }
169 
170     private HBaseTransaction getHBaseTransaction(byte[] byteTransaction, boolean isLowLatency)
171             throws InvalidProtocolBufferException {
172         TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);
173         long id = transaction.getTimestamp();
174         long readTs = transaction.getReadTimestamp();
175         long epoch = transaction.getEpoch();
176         VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel());
177 
178         return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null,
179                 isLowLatency);
180 
181     }
182 
183     private CommitTable.Client initAndGetCommitTableClient() throws IOException {
184         if (inMemoryCommitTable != null) {
185             return inMemoryCommitTable;
186         }
187         return commitTableClient;
188     }
189 
190 }