View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.proto.TSOProto;
24  import org.jboss.netty.bootstrap.ServerBootstrap;
25  import org.jboss.netty.channel.Channel;
26  import org.jboss.netty.channel.ChannelFactory;
27  import org.jboss.netty.channel.ChannelHandler;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelPipelineFactory;
31  import org.jboss.netty.channel.ChannelStateEvent;
32  import org.jboss.netty.channel.Channels;
33  import org.jboss.netty.channel.ExceptionEvent;
34  import org.jboss.netty.channel.MessageEvent;
35  import org.jboss.netty.channel.SimpleChannelHandler;
36  import org.jboss.netty.channel.group.ChannelGroup;
37  import org.jboss.netty.channel.group.DefaultChannelGroup;
38  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
39  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
40  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
41  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
42  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  import javax.inject.Inject;
47  import java.io.Closeable;
48  import java.io.IOException;
49  import java.net.InetSocketAddress;
50  import java.nio.channels.ClosedChannelException;
51  import java.util.concurrent.Executors;
52  
53  /**
54   * ChannelHandler for the TSO Server.
55   *
56   * Incoming requests are processed in this class
57   */
58  public class TSOChannelHandler extends SimpleChannelHandler implements Closeable {
59  
60      private static final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
61  
62      private final ChannelFactory factory;
63  
64      private final ServerBootstrap bootstrap;
65  
66      @VisibleForTesting
67      Channel listeningChannel;
68      @VisibleForTesting
69      ChannelGroup channelGroup;
70  
71      private RequestProcessor requestProcessor;
72  
73      private TSOServerConfig config;
74  
75      private MetricsRegistry metrics;
76  
77      @Inject
78      public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
79  
80          this.config = config;
81          this.metrics = metrics;
82          this.requestProcessor = requestProcessor;
83          // Setup netty listener
84          this.factory = new NioServerSocketChannelFactory(
85                  Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
86                  Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
87                  (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
88  
89          this.bootstrap = new ServerBootstrap(factory);
90          bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
91  
92      }
93  
94      /**
95       * Allows to create and connect the communication channel closing the previous one if existed
96       */
97      void reconnect() {
98          if (listeningChannel == null && channelGroup == null) {
99              LOG.debug("Creating communication channel...");
100         } else {
101             LOG.debug("Reconnecting communication channel...");
102             closeConnection();
103         }
104         // Create the global ChannelGroup
105         channelGroup = new DefaultChannelGroup(TSOChannelHandler.class.getName());
106         LOG.debug("\tCreating channel to listening for incoming connections in port {}", config.getPort());
107         listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort()));
108         channelGroup.add(listeningChannel);
109         LOG.debug("\tListening channel created and connected: {}", listeningChannel);
110     }
111 
112     /**
113      * Allows to close the communication channel
114      */
115     void closeConnection() {
116         LOG.debug("Closing communication channel...");
117         if (listeningChannel != null) {
118             LOG.debug("\tUnbinding listening channel {}", listeningChannel);
119             listeningChannel.unbind().awaitUninterruptibly();
120             LOG.debug("\tListening channel {} unbound", listeningChannel);
121         }
122         if (channelGroup != null) {
123             LOG.debug("\tClosing channel group {}", channelGroup);
124             channelGroup.close().awaitUninterruptibly();
125             LOG.debug("\tChannel group {} closed", channelGroup);
126         }
127     }
128 
129     // ----------------------------------------------------------------------------------------------------------------
130     // Netty SimpleChannelHandler implementation
131     // ----------------------------------------------------------------------------------------------------------------
132 
133     @Override
134     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
135         channelGroup.add(ctx.getChannel());
136         LOG.debug("TSO channel connected: {}", ctx.getChannel());
137     }
138 
139     @Override
140     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
141         channelGroup.remove(ctx.getChannel());
142         LOG.debug("TSO channel disconnected: {}", ctx.getChannel());
143     }
144 
145     @Override
146     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
147         LOG.debug("TSO channel closed: {}", ctx.getChannel());
148     }
149 
150     /**
151      * Handle received messages
152      */
153     @Override
154     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
155         Object msg = e.getMessage();
156         if (msg instanceof TSOProto.Request) {
157             TSOProto.Request request = (TSOProto.Request) msg;
158             if (request.hasHandshakeRequest()) {
159                 checkHandshake(ctx, request.getHandshakeRequest());
160                 return;
161             }
162             if (!handshakeCompleted(ctx)) {
163                 LOG.error("Handshake not completed. Closing channel {}", ctx.getChannel());
164                 ctx.getChannel().close();
165             }
166 
167             if (request.hasTimestampRequest()) {
168                 requestProcessor.timestampRequest(ctx.getChannel(), new MonitoringContext(metrics));
169             } else if (request.hasCommitRequest()) {
170                 TSOProto.CommitRequest cr = request.getCommitRequest();
171                 requestProcessor.commitRequest(cr.getStartTimestamp(),
172                                                cr.getCellIdList(),
173                                                cr.getIsRetry(),
174                                                ctx.getChannel(),
175                                                new MonitoringContext(metrics));
176             } else {
177                 LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
178                 ctx.getChannel().close();
179             }
180         } else {
181             LOG.error("Unknown message type", msg);
182         }
183     }
184 
185     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
186     @Override
187     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
188         if (e.getCause() instanceof ClosedChannelException) {
189             LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
190             return;
191         }
192         LOG.warn("Unexpected exception from downstream. Closing channel {}", ctx.getChannel(), e.getCause());
193         ctx.getChannel().close();
194     }
195 
196     // ----------------------------------------------------------------------------------------------------------------
197     // Closeable implementation
198     // ----------------------------------------------------------------------------------------------------------------
199     @Override
200     public void close() throws IOException {
201         closeConnection();
202         factory.releaseExternalResources();
203     }
204 
205     // ----------------------------------------------------------------------------------------------------------------
206     // Helper methods and classes
207     // ----------------------------------------------------------------------------------------------------------------
208 
209     /**
210      * Contains the required context for handshake
211      */
212     private static class TSOChannelContext {
213 
214         boolean handshakeComplete;
215 
216         TSOChannelContext() {
217             handshakeComplete = false;
218         }
219 
220         boolean getHandshakeComplete() {
221             return handshakeComplete;
222         }
223 
224         void setHandshakeComplete() {
225             handshakeComplete = true;
226         }
227 
228     }
229 
230     private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
231 
232         TSOProto.HandshakeResponse.Builder response = TSOProto.HandshakeResponse.newBuilder();
233         if (request.hasClientCapabilities()) {
234 
235             response.setClientCompatible(true)
236                     .setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
237             TSOChannelContext tsoCtx = new TSOChannelContext();
238             tsoCtx.setHandshakeComplete();
239             ctx.setAttachment(tsoCtx);
240         } else {
241             response.setClientCompatible(false);
242         }
243         ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
244 
245     }
246 
247     private boolean handshakeCompleted(ChannelHandlerContext ctx) {
248 
249         Object o = ctx.getAttachment();
250         if (o instanceof TSOChannelContext) {
251             TSOChannelContext tsoCtx = (TSOChannelContext) o;
252             return tsoCtx.getHandshakeComplete();
253         }
254         return false;
255 
256     }
257 
258     /**
259      * Netty pipeline configuration
260      */
261     static class TSOPipelineFactory implements ChannelPipelineFactory {
262 
263         private final ChannelHandler handler;
264 
265         TSOPipelineFactory(ChannelHandler handler) {
266             this.handler = handler;
267         }
268 
269         public ChannelPipeline getPipeline() throws Exception {
270 
271             ChannelPipeline pipeline = Channels.pipeline();
272             // Max packet length is 10MB. Transactions with so many cells
273             // that the packet is rejected will receive a ServiceUnavailableException.
274             // 10MB is enough for 2 million cells in a transaction though.
275             pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
276             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
277             pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
278             pipeline.addLast("protobufencoder", new ProtobufEncoder());
279             pipeline.addLast("handler", handler);
280 
281             return pipeline;
282         }
283     }
284 
285 }