1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import org.apache.omid.proto.TSOProto;
23 import org.apache.omid.proto.TSOProto.Response;
24 import org.jboss.netty.bootstrap.ClientBootstrap;
25 import org.jboss.netty.channel.Channel;
26 import org.jboss.netty.channel.ChannelFactory;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelHandler;
34 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
35 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
36 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
37 import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
38 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import java.net.InetSocketAddress;
43 import java.util.concurrent.ArrayBlockingQueue;
44 import java.util.concurrent.BlockingQueue;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.Executors;
47 import java.util.concurrent.Future;
48
49
50
51
52 public class TSOClientRaw {
53
54 private static final Logger LOG = LoggerFactory.getLogger(TSOClientRaw.class);
55
56 private final BlockingQueue<SettableFuture<Response>> responseQueue
57 = new ArrayBlockingQueue<SettableFuture<Response>>(5);
58 private final Channel channel;
59
60 public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
61
62 ChannelFactory factory = new NioClientSocketChannelFactory(
63 Executors.newCachedThreadPool(
64 new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
65 Executors.newCachedThreadPool(
66 new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
67
68 ClientBootstrap bootstrap = new ClientBootstrap(factory);
69
70 InetSocketAddress addr = new InetSocketAddress(host, port);
71
72 ChannelPipeline pipeline = bootstrap.getPipeline();
73 pipeline.addLast("lengthbaseddecoder",
74 new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
75 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
76 pipeline.addLast("protobufdecoder",
77 new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
78 pipeline.addLast("protobufencoder", new ProtobufEncoder());
79
80 Handler handler = new Handler();
81 pipeline.addLast("handler", handler);
82
83 bootstrap.setOption("tcpNoDelay", true);
84 bootstrap.setOption("keepAlive", true);
85 bootstrap.setOption("reuseAddress", true);
86 bootstrap.setOption("connectTimeoutMillis", 100);
87
88 ChannelFuture channelFuture = bootstrap.connect(addr).await();
89 channel = channelFuture.getChannel();
90 }
91
92 public void write(TSOProto.Request request) {
93 channel.write(request);
94 }
95
96 public Future<Response> getResponse() throws InterruptedException {
97 SettableFuture<Response> future = SettableFuture.create();
98 responseQueue.put(future);
99 return future;
100 }
101
102 public void close() throws InterruptedException {
103 responseQueue.put(SettableFuture.<Response>create());
104 channel.close();
105 }
106
107 private class Handler extends SimpleChannelHandler {
108 @Override
109 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
110 LOG.info("Message received", e);
111 if (e.getMessage() instanceof Response) {
112 Response resp = (Response) e.getMessage();
113 try {
114 SettableFuture<Response> future = responseQueue.take();
115 future.set(resp);
116 } catch (InterruptedException ie) {
117 Thread.currentThread().interrupt();
118 LOG.warn("Interrupted in handler", ie);
119 }
120 } else {
121 LOG.warn("Received unknown message", e.getMessage());
122 }
123 }
124
125 @Override
126 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
127 LOG.info("Exception received", e.getCause());
128 try {
129 SettableFuture<Response> future = responseQueue.take();
130 future.setException(e.getCause());
131 } catch (InterruptedException ie) {
132 Thread.currentThread().interrupt();
133 LOG.warn("Interrupted handling exception", ie);
134 }
135 }
136
137 @Override
138 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
139 throws Exception {
140 LOG.info("Disconnected");
141 try {
142 SettableFuture<Response> future = responseQueue.take();
143 future.setException(new ConnectionException());
144 } catch (InterruptedException ie) {
145 Thread.currentThread().interrupt();
146 LOG.warn("Interrupted handling exception", ie);
147 }
148 }
149 }
150 }