1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import org.apache.omid.metrics.NullMetricsProvider;
22 import org.apache.omid.proto.TSOProto;
23 import org.jboss.netty.bootstrap.ClientBootstrap;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelException;
26 import org.jboss.netty.channel.ChannelFactory;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.SimpleChannelHandler;
33 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
34 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
35 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
36 import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
37 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
38 import org.mockito.Mock;
39 import org.mockito.MockitoAnnotations;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.testng.annotations.AfterMethod;
43 import org.testng.annotations.BeforeMethod;
44 import org.testng.annotations.Test;
45
46 import java.io.IOException;
47 import java.net.InetSocketAddress;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.TimeUnit;
50
51 import static org.mockito.Matchers.any;
52 import static org.mockito.Matchers.anyBoolean;
53 import static org.mockito.Matchers.anyCollectionOf;
54 import static org.mockito.Matchers.anyLong;
55 import static org.mockito.Matchers.eq;
56 import static org.mockito.Mockito.reset;
57 import static org.mockito.Mockito.timeout;
58 import static org.mockito.Mockito.verify;
59 import static org.testng.Assert.assertEquals;
60 import static org.testng.Assert.assertFalse;
61 import static org.testng.Assert.assertNull;
62 import static org.testng.Assert.assertTrue;
63
64 @SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
65 public class TestTSOChannelHandlerNetty {
66
67 private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
68
69 @Mock
70 private
71 RequestProcessor requestProcessor;
72
73
74 private TSOChannelHandler channelHandler;
75
76 @BeforeMethod
77 public void beforeTestMethod() {
78 MockitoAnnotations.initMocks(this);
79 TSOServerConfig config = new TSOServerConfig();
80 config.setPort(1434);
81 channelHandler = new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
82 }
83
84 @AfterMethod
85 public void afterTestMethod() throws IOException {
86 channelHandler.close();
87 }
88
89 @Test(timeOut = 10_000)
90 public void testMainAPI() throws Exception {
91
92
93 assertNull(channelHandler.listeningChannel);
94 assertNull(channelHandler.channelGroup);
95
96
97 channelHandler.reconnect();
98 assertTrue(channelHandler.listeningChannel.isOpen());
99 assertEquals(channelHandler.channelGroup.size(), 1);
100 assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), 1434);
101
102
103 channelHandler.closeConnection();
104 assertFalse(channelHandler.listeningChannel.isOpen());
105 assertEquals(channelHandler.channelGroup.size(), 0);
106
107
108 channelHandler.closeConnection();
109 assertFalse(channelHandler.listeningChannel.isOpen());
110 assertEquals(channelHandler.channelGroup.size(), 0);
111
112
113 channelHandler.reconnect();
114 assertTrue(channelHandler.listeningChannel.isOpen());
115 assertEquals(channelHandler.channelGroup.size(), 1);
116
117
118 channelHandler.reconnect();
119 assertTrue(channelHandler.listeningChannel.isOpen());
120 assertEquals(channelHandler.channelGroup.size(), 1);
121
122
123 channelHandler.close();
124 assertFalse(channelHandler.listeningChannel.isOpen());
125 assertEquals(channelHandler.channelGroup.size(), 0);
126 try {
127 channelHandler.reconnect();
128 } catch (ChannelException e) {
129
130 assertFalse(channelHandler.listeningChannel.isOpen());
131 assertEquals(channelHandler.channelGroup.size(), 0);
132 }
133
134 }
135
136 @Test(timeOut = 10_000)
137 public void testNettyConnectionToTSOFromClient() throws Exception {
138
139 ClientBootstrap nettyClient = createNettyClientBootstrap();
140
141 ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
142
143
144
145
146 while (!channelF.isDone()) ;
147 assertFalse(channelF.isSuccess());
148
149
150
151
152 channelHandler.reconnect();
153 assertTrue(channelHandler.listeningChannel.isOpen());
154
155 assertEquals(channelHandler.channelGroup.size(), 1);
156
157
158
159
160 channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
161 while (!channelF.isDone()) ;
162 assertTrue(channelF.isSuccess());
163 assertTrue(channelF.getChannel().isConnected());
164
165 while (channelHandler.channelGroup.size() != 2) ;
166
167
168
169
170 channelF.getChannel().close().await();
171
172 while (channelHandler.channelGroup.size() != 1) ;
173
174
175
176
177 channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
178 while (!channelF.isDone()) ;
179 assertTrue(channelF.isSuccess());
180
181 while (channelHandler.channelGroup.size() != 2) ;
182 channelHandler.closeConnection();
183 assertFalse(channelHandler.listeningChannel.isOpen());
184 assertEquals(channelHandler.channelGroup.size(), 0);
185
186 TimeUnit.SECONDS.sleep(1);
187 assertFalse(channelF.getChannel().isOpen());
188
189
190
191
192
193 channelHandler.reconnect();
194 assertTrue(channelHandler.listeningChannel.isOpen());
195
196 assertEquals(channelHandler.channelGroup.size(), 1);
197
198 channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
199 while (!channelF.isDone()) ;
200 assertTrue(channelF.isSuccess());
201
202 while (channelHandler.channelGroup.size() != 2) ;
203
204 channelHandler.reconnect();
205 assertTrue(channelHandler.listeningChannel.isOpen());
206
207 assertEquals(channelHandler.channelGroup.size(), 1);
208
209 TimeUnit.SECONDS.sleep(1);
210 assertFalse(channelF.getChannel().isOpen());
211
212
213
214
215 channelHandler.close();
216 assertFalse(channelHandler.listeningChannel.isOpen());
217 assertEquals(channelHandler.channelGroup.size(), 0);
218 }
219
220 @Test(timeOut = 10_000)
221 public void testNettyChannelWriting() throws Exception {
222
223
224
225
226
227
228 channelHandler.reconnect();
229
230 ClientBootstrap nettyClient = createNettyClientBootstrap();
231 ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
232
233 while (!channelF.isDone()) ;
234 assertTrue(channelF.isSuccess());
235 assertTrue(channelF.getChannel().isConnected());
236 Channel channel = channelF.getChannel();
237
238 while (channelHandler.channelGroup.size() != 2) ;
239
240 TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
241
242 handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
243 channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
244
245
246
247
248 testWritingTimestampRequest(channel);
249
250 testWritingCommitRequest(channel);
251
252 testWritingFenceRequest(channel);
253 }
254
255 private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
256
257 reset(requestProcessor);
258 TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
259 TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
260 tsBuilder.setTimestampRequest(tsRequestBuilder.build());
261
262 channel.write(tsBuilder.build()).await();
263 verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
264 verify(requestProcessor, timeout(100).never())
265 .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
266 }
267
268 private void testWritingCommitRequest(Channel channel) throws InterruptedException {
269
270 reset(requestProcessor);
271 TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
272 TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
273 commitRequestBuilder.setStartTimestamp(666);
274 commitRequestBuilder.addCellId(666);
275 commitBuilder.setCommitRequest(commitRequestBuilder.build());
276 TSOProto.Request r = commitBuilder.build();
277 assertTrue(r.hasCommitRequest());
278
279 channel.write(commitBuilder.build()).await();
280 verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
281 verify(requestProcessor, timeout(100).times(1))
282 .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
283 }
284
285 private void testWritingFenceRequest(Channel channel) throws InterruptedException {
286
287 reset(requestProcessor);
288 TSOProto.Request.Builder fenceBuilder = TSOProto.Request.newBuilder();
289 TSOProto.FenceRequest.Builder fenceRequestBuilder = TSOProto.FenceRequest.newBuilder();
290 fenceRequestBuilder.setTableId(666);
291 fenceBuilder.setFenceRequest(fenceRequestBuilder.build());
292 TSOProto.Request r = fenceBuilder.build();
293 assertTrue(r.hasFenceRequest());
294
295 channel.write(fenceBuilder.build()).await();
296 verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
297 verify(requestProcessor, timeout(100).times(1))
298 .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContextImpl.class));
299 }
300
301
302
303
304
305 private ClientBootstrap createNettyClientBootstrap() {
306
307 ChannelFactory factory = new NioClientSocketChannelFactory(
308 Executors.newCachedThreadPool(
309 new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()),
310 Executors.newCachedThreadPool(
311 new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
312
313 ClientBootstrap bootstrap = new ClientBootstrap(factory);
314 bootstrap.setOption("tcpNoDelay", true);
315 bootstrap.setOption("keepAlive", true);
316 bootstrap.setOption("reuseAddress", true);
317 bootstrap.setOption("connectTimeoutMillis", 100);
318 ChannelPipeline pipeline = bootstrap.getPipeline();
319 pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
320 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
321 pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
322 pipeline.addLast("protobufencoder", new ProtobufEncoder());
323 pipeline.addLast("handler", new SimpleChannelHandler() {
324
325 @Override
326 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
327 LOG.info("Channel {} connected", ctx.getChannel());
328 }
329
330 @Override
331 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
332 LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
333 }
334
335 });
336 return bootstrap;
337 }
338
339 }