View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.proto.TSOProto;
24  import org.jboss.netty.bootstrap.ServerBootstrap;
25  import org.jboss.netty.channel.Channel;
26  import org.jboss.netty.channel.ChannelFactory;
27  import org.jboss.netty.channel.ChannelHandler;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelPipelineFactory;
31  import org.jboss.netty.channel.ChannelStateEvent;
32  import org.jboss.netty.channel.Channels;
33  import org.jboss.netty.channel.ExceptionEvent;
34  import org.jboss.netty.channel.MessageEvent;
35  import org.jboss.netty.channel.SimpleChannelHandler;
36  import org.jboss.netty.channel.group.ChannelGroup;
37  import org.jboss.netty.channel.group.DefaultChannelGroup;
38  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
39  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
40  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
41  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
42  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  import javax.inject.Inject;
47  import java.io.Closeable;
48  import java.io.IOException;
49  import java.net.InetSocketAddress;
50  import java.nio.channels.ClosedChannelException;
51  import java.util.concurrent.Executors;
52  
53  /**
54   * ChannelHandler for the TSO Server.
55   *
56   * Incoming requests are processed in this class
57   */
58  public class TSOChannelHandler extends SimpleChannelHandler implements Closeable {
59  
60      private static final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
61  
62      private final ChannelFactory factory;
63  
64      private final ServerBootstrap bootstrap;
65  
66      @VisibleForTesting
67      Channel listeningChannel;
68      @VisibleForTesting
69      ChannelGroup channelGroup;
70  
71      private RequestProcessor requestProcessor;
72  
73      private TSOServerConfig config;
74  
75      private MetricsRegistry metrics;
76  
77      @Inject
78      public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
79  
80          this.config = config;
81          this.metrics = metrics;
82          this.requestProcessor = requestProcessor;
83          // Setup netty listener
84          this.factory = new NioServerSocketChannelFactory(
85                  Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
86                  Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
87                  (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
88  
89          this.bootstrap = new ServerBootstrap(factory);
90          bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
91  
92      }
93  
94      /**
95       * Allows to create and connect the communication channel closing the previous one if existed
96       */
97      void reconnect() {
98          if (listeningChannel == null && channelGroup == null) {
99              LOG.debug("Creating communication channel...");
100         } else {
101             LOG.debug("Reconnecting communication channel...");
102             closeConnection();
103         }
104         // Create the global ChannelGroup
105         channelGroup = new DefaultChannelGroup(TSOChannelHandler.class.getName());
106         LOG.debug("\tCreating channel to listening for incoming connections in port {}", config.getPort());
107         listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort()));
108         channelGroup.add(listeningChannel);
109         LOG.debug("\tListening channel created and connected: {}", listeningChannel);
110     }
111 
112     /**
113      * Allows to close the communication channel
114      */
115     void closeConnection() {
116         LOG.debug("Closing communication channel...");
117         if (listeningChannel != null) {
118             LOG.debug("\tUnbinding listening channel {}", listeningChannel);
119             listeningChannel.unbind().awaitUninterruptibly();
120             LOG.debug("\tListening channel {} unbound", listeningChannel);
121         }
122         if (channelGroup != null) {
123             LOG.debug("\tClosing channel group {}", channelGroup);
124             channelGroup.close().awaitUninterruptibly();
125             LOG.debug("\tChannel group {} closed", channelGroup);
126         }
127     }
128 
129     // ----------------------------------------------------------------------------------------------------------------
130     // Netty SimpleChannelHandler implementation
131     // ----------------------------------------------------------------------------------------------------------------
132 
133     @Override
134     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
135         channelGroup.add(ctx.getChannel());
136         LOG.debug("TSO channel connected: {}", ctx.getChannel());
137     }
138 
139     @Override
140     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
141         channelGroup.remove(ctx.getChannel());
142         LOG.debug("TSO channel disconnected: {}", ctx.getChannel());
143     }
144 
145     @Override
146     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
147         LOG.debug("TSO channel closed: {}", ctx.getChannel());
148     }
149 
150     /**
151      * Handle received messages
152      */
153     @Override
154     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
155         Object msg = e.getMessage();
156         if (msg instanceof TSOProto.Request) {
157             TSOProto.Request request = (TSOProto.Request) msg;
158             if (request.hasHandshakeRequest()) {
159                 checkHandshake(ctx, request.getHandshakeRequest());
160                 return;
161             }
162             if (!handshakeCompleted(ctx)) {
163                 LOG.error("Handshake not completed. Closing channel {}", ctx.getChannel());
164                 ctx.getChannel().close();
165             }
166 
167             if (request.hasTimestampRequest()) {
168                 requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
169             } else if (request.hasCommitRequest()) {
170                 TSOProto.CommitRequest cr = request.getCommitRequest();
171                 requestProcessor.commitRequest(cr.getStartTimestamp(),
172                                                cr.getCellIdList(),
173                                                cr.getTableIdList(),
174                                                cr.getIsRetry(),
175                                                ctx.getChannel(),
176                                                MonitoringContextFactory.getInstance(config,metrics));
177             } else if (request.hasFenceRequest()) {
178                 TSOProto.FenceRequest fr = request.getFenceRequest();
179                 requestProcessor.fenceRequest(fr.getTableId(),
180                         ctx.getChannel(),
181                         MonitoringContextFactory.getInstance(config,metrics));
182             } else {
183                 LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
184                 ctx.getChannel().close();
185             }
186         } else {
187             LOG.error("Unknown message type", msg);
188         }
189     }
190 
191     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
192     @Override
193     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
194         if (e.getCause() instanceof ClosedChannelException) {
195             LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
196             return;
197         }
198         LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
199         ctx.getChannel().close();
200     }
201 
202     // ----------------------------------------------------------------------------------------------------------------
203     // Closeable implementation
204     // ----------------------------------------------------------------------------------------------------------------
205     @Override
206     public void close() throws IOException {
207         closeConnection();
208         factory.releaseExternalResources();
209     }
210 
211     // ----------------------------------------------------------------------------------------------------------------
212     // Helper methods and classes
213     // ----------------------------------------------------------------------------------------------------------------
214 
215     /**
216      * Contains the required context for handshake
217      */
218     private static class TSOChannelContext {
219 
220         boolean handshakeComplete;
221 
222         TSOChannelContext() {
223             handshakeComplete = false;
224         }
225 
226         boolean getHandshakeComplete() {
227             return handshakeComplete;
228         }
229 
230         void setHandshakeComplete() {
231             handshakeComplete = true;
232         }
233 
234     }
235 
236     private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
237 
238         TSOProto.HandshakeResponse.Builder response = TSOProto.HandshakeResponse.newBuilder();
239         if (request.hasClientCapabilities()) {
240 
241             response.setClientCompatible(true)
242                     .setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
243             TSOChannelContext tsoCtx = new TSOChannelContext();
244             tsoCtx.setHandshakeComplete();
245             ctx.setAttachment(tsoCtx);
246         } else {
247             response.setClientCompatible(false);
248         }
249         response.setLowLatency(config.getLowLatency());
250         ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
251 
252     }
253 
254     private boolean handshakeCompleted(ChannelHandlerContext ctx) {
255 
256         Object o = ctx.getAttachment();
257         if (o instanceof TSOChannelContext) {
258             TSOChannelContext tsoCtx = (TSOChannelContext) o;
259             return tsoCtx.getHandshakeComplete();
260         }
261         return false;
262 
263     }
264 
265     /**
266      * Netty pipeline configuration
267      */
268     static class TSOPipelineFactory implements ChannelPipelineFactory {
269 
270         private final ChannelHandler handler;
271 
272         TSOPipelineFactory(ChannelHandler handler) {
273             this.handler = handler;
274         }
275 
276         public ChannelPipeline getPipeline() throws Exception {
277 
278             ChannelPipeline pipeline = Channels.pipeline();
279             // Max packet length is 10MB. Transactions with so many cells
280             // that the packet is rejected will receive a ServiceUnavailableException.
281             // 10MB is enough for 2 million cells in a transaction though.
282             pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
283             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
284             pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
285             pipeline.addLast("protobufencoder", new ProtobufEncoder());
286             pipeline.addLast("handler", handler);
287 
288             return pipeline;
289         }
290     }
291 
292 }