View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import org.apache.omid.proto.TSOProto;
23  import org.apache.omid.proto.TSOProto.Response;
24  import org.jboss.netty.bootstrap.ClientBootstrap;
25  import org.jboss.netty.channel.Channel;
26  import org.jboss.netty.channel.ChannelFactory;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.ExceptionEvent;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.channel.SimpleChannelHandler;
34  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
35  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
36  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
37  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
38  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import java.net.InetSocketAddress;
43  import java.util.concurrent.ArrayBlockingQueue;
44  import java.util.concurrent.BlockingQueue;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.Executors;
47  import java.util.concurrent.Future;
48  
49  /**
50   * Raw client for communicating with tso server directly with protobuf messages
51   */
52  public class TSOClientRaw {
53  
54      private static final Logger LOG = LoggerFactory.getLogger(TSOClientRaw.class);
55  
56      private final BlockingQueue<SettableFuture<Response>> responseQueue
57              = new ArrayBlockingQueue<SettableFuture<Response>>(5);
58      private final Channel channel;
59  
60      public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
61          // Start client with Nb of active threads = 3 as maximum.
62          ChannelFactory factory = new NioClientSocketChannelFactory(
63                  Executors.newCachedThreadPool(
64                          new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
65                  Executors.newCachedThreadPool(
66                          new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
67          // Create the bootstrap
68          ClientBootstrap bootstrap = new ClientBootstrap(factory);
69  
70          InetSocketAddress addr = new InetSocketAddress(host, port);
71  
72          ChannelPipeline pipeline = bootstrap.getPipeline();
73          pipeline.addLast("lengthbaseddecoder",
74                  new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
75          pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
76          pipeline.addLast("protobufdecoder",
77                  new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
78          pipeline.addLast("protobufencoder", new ProtobufEncoder());
79  
80          Handler handler = new Handler();
81          pipeline.addLast("handler", handler);
82  
83          bootstrap.setOption("tcpNoDelay", true);
84          bootstrap.setOption("keepAlive", true);
85          bootstrap.setOption("reuseAddress", true);
86          bootstrap.setOption("connectTimeoutMillis", 100);
87  
88          ChannelFuture channelFuture = bootstrap.connect(addr).await();
89          channel = channelFuture.getChannel();
90      }
91  
92      public void write(TSOProto.Request request) {
93          channel.write(request);
94      }
95  
96      public Future<Response> getResponse() throws InterruptedException {
97          SettableFuture<Response> future = SettableFuture.create();
98          responseQueue.put(future);
99          return future;
100     }
101 
102     public void close() throws InterruptedException {
103         responseQueue.put(SettableFuture.<Response>create());
104         channel.close();
105     }
106 
107     private class Handler extends SimpleChannelHandler {
108         @Override
109         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
110             LOG.info("Message received", e);
111             if (e.getMessage() instanceof Response) {
112                 Response resp = (Response) e.getMessage();
113                 try {
114                     SettableFuture<Response> future = responseQueue.take();
115                     future.set(resp);
116                 } catch (InterruptedException ie) {
117                     Thread.currentThread().interrupt();
118                     LOG.warn("Interrupted in handler", ie);
119                 }
120             } else {
121                 LOG.warn("Received unknown message", e.getMessage());
122             }
123         }
124 
125         @Override
126         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
127             LOG.info("Exception received", e.getCause());
128             try {
129                 SettableFuture<Response> future = responseQueue.take();
130                 future.setException(e.getCause());
131             } catch (InterruptedException ie) {
132                 Thread.currentThread().interrupt();
133                 LOG.warn("Interrupted handling exception", ie);
134             }
135         }
136 
137         @Override
138         public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
139                 throws Exception {
140             LOG.info("Disconnected");
141             try {
142                 SettableFuture<Response> future = responseQueue.take();
143                 future.setException(new ConnectionException());
144             } catch (InterruptedException ie) {
145                 Thread.currentThread().interrupt();
146                 LOG.warn("Interrupted handling exception", ie);
147             }
148         }
149     }
150 }