View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import org.apache.omid.metrics.NullMetricsProvider;
22  import org.apache.omid.proto.TSOProto;
23  import org.jboss.netty.bootstrap.ClientBootstrap;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelException;
26  import org.jboss.netty.channel.ChannelFactory;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.ExceptionEvent;
32  import org.jboss.netty.channel.SimpleChannelHandler;
33  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
34  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
35  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
36  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
37  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
38  import org.mockito.Mock;
39  import org.mockito.MockitoAnnotations;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  import org.testng.annotations.AfterMethod;
43  import org.testng.annotations.BeforeMethod;
44  import org.testng.annotations.Test;
45  
46  import java.io.IOException;
47  import java.net.InetSocketAddress;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.TimeUnit;
50  
51  import static org.mockito.Matchers.any;
52  import static org.mockito.Matchers.anyBoolean;
53  import static org.mockito.Matchers.anyCollectionOf;
54  import static org.mockito.Matchers.anyLong;
55  import static org.mockito.Matchers.eq;
56  import static org.mockito.Mockito.reset;
57  import static org.mockito.Mockito.timeout;
58  import static org.mockito.Mockito.verify;
59  import static org.testng.Assert.assertEquals;
60  import static org.testng.Assert.assertFalse;
61  import static org.testng.Assert.assertNull;
62  import static org.testng.Assert.assertTrue;
63  
64  @SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
65  public class TestTSOChannelHandlerNetty {
66  
67      private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
68  
69      @Mock
70      private
71      RequestProcessor requestProcessor;
72  
73      // Component under test
74      private TSOChannelHandler channelHandler;
75  
76      @BeforeMethod
77      public void beforeTestMethod() {
78          MockitoAnnotations.initMocks(this);
79          TSOServerConfig config = new TSOServerConfig();
80          config.setPort(1434);
81          channelHandler = new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
82      }
83  
84      @AfterMethod
85      public void afterTestMethod() throws IOException {
86          channelHandler.close();
87      }
88  
89      @Test(timeOut = 10_000)
90      public void testMainAPI() throws Exception {
91  
92          // Check initial state
93          assertNull(channelHandler.listeningChannel);
94          assertNull(channelHandler.channelGroup);
95  
96          // Check initial connection
97          channelHandler.reconnect();
98          assertTrue(channelHandler.listeningChannel.isOpen());
99          assertEquals(channelHandler.channelGroup.size(), 1);
100         assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), 1434);
101 
102         // Check connection close
103         channelHandler.closeConnection();
104         assertFalse(channelHandler.listeningChannel.isOpen());
105         assertEquals(channelHandler.channelGroup.size(), 0);
106 
107         // Check re-closing connection
108         channelHandler.closeConnection();
109         assertFalse(channelHandler.listeningChannel.isOpen());
110         assertEquals(channelHandler.channelGroup.size(), 0);
111 
112         // Check connection after closing
113         channelHandler.reconnect();
114         assertTrue(channelHandler.listeningChannel.isOpen());
115         assertEquals(channelHandler.channelGroup.size(), 1);
116 
117         // Check re-connection
118         channelHandler.reconnect();
119         assertTrue(channelHandler.listeningChannel.isOpen());
120         assertEquals(channelHandler.channelGroup.size(), 1);
121 
122         // Exercise closeable with re-connection trial
123         channelHandler.close();
124         assertFalse(channelHandler.listeningChannel.isOpen());
125         assertEquals(channelHandler.channelGroup.size(), 0);
126         try {
127             channelHandler.reconnect();
128         } catch (ChannelException e) {
129             // Expected: Can't reconnect after closing
130             assertFalse(channelHandler.listeningChannel.isOpen());
131             assertEquals(channelHandler.channelGroup.size(), 0);
132         }
133 
134     }
135 
136     @Test(timeOut = 10_000)
137     public void testNettyConnectionToTSOFromClient() throws Exception {
138 
139         ClientBootstrap nettyClient = createNettyClientBootstrap();
140 
141         ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
142 
143         // ------------------------------------------------------------------------------------------------------------
144         // Test the client can't connect cause the server is not there
145         // ------------------------------------------------------------------------------------------------------------
146         while (!channelF.isDone()) /** do nothing */ ;
147         assertFalse(channelF.isSuccess());
148 
149         // ------------------------------------------------------------------------------------------------------------
150         // Test creation of a server connection
151         // ------------------------------------------------------------------------------------------------------------
152         channelHandler.reconnect();
153         assertTrue(channelHandler.listeningChannel.isOpen());
154         // Eventually the channel group of the server should contain the listening channel
155         assertEquals(channelHandler.channelGroup.size(), 1);
156 
157         // ------------------------------------------------------------------------------------------------------------
158         // Test that a client can connect now
159         // ------------------------------------------------------------------------------------------------------------
160         channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
161         while (!channelF.isDone()) /** do nothing */ ;
162         assertTrue(channelF.isSuccess());
163         assertTrue(channelF.getChannel().isConnected());
164         // Eventually the channel group of the server should have 2 elements
165         while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
166 
167         // ------------------------------------------------------------------------------------------------------------
168         // Close the channel on the client side and test we have one element less in the channel group
169         // ------------------------------------------------------------------------------------------------------------
170         channelF.getChannel().close().await();
171         // Eventually the channel group of the server should have only one element
172         while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
173 
174         // ------------------------------------------------------------------------------------------------------------
175         // Open a new channel and test the connection closing on the server side through the channel handler
176         // ------------------------------------------------------------------------------------------------------------
177         channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
178         while (!channelF.isDone()) /** do nothing */ ;
179         assertTrue(channelF.isSuccess());
180         // Eventually the channel group of the server should have 2 elements again
181         while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
182         channelHandler.closeConnection();
183         assertFalse(channelHandler.listeningChannel.isOpen());
184         assertEquals(channelHandler.channelGroup.size(), 0);
185         // Wait some time and check the channel was closed
186         TimeUnit.SECONDS.sleep(1);
187         assertFalse(channelF.getChannel().isOpen());
188 
189         // ------------------------------------------------------------------------------------------------------------
190         // Test server re-connections with connected clients
191         // ------------------------------------------------------------------------------------------------------------
192         // Connect first time
193         channelHandler.reconnect();
194         assertTrue(channelHandler.listeningChannel.isOpen());
195         // Eventually the channel group of the server should contain the listening channel
196         assertEquals(channelHandler.channelGroup.size(), 1);
197         // Check the client can connect
198         channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
199         while (!channelF.isDone()) /** do nothing */ ;
200         assertTrue(channelF.isSuccess());
201         // Eventually the channel group of the server should have 2 elements
202         while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
203         // Re-connect and check that client connection was gone
204         channelHandler.reconnect();
205         assertTrue(channelHandler.listeningChannel.isOpen());
206         // Eventually the channel group of the server should contain the listening channel
207         assertEquals(channelHandler.channelGroup.size(), 1);
208         // Wait some time and check the channel was closed
209         TimeUnit.SECONDS.sleep(1);
210         assertFalse(channelF.getChannel().isOpen());
211 
212         // ------------------------------------------------------------------------------------------------------------
213         // Test closeable interface with re-connection trial
214         // ------------------------------------------------------------------------------------------------------------
215         channelHandler.close();
216         assertFalse(channelHandler.listeningChannel.isOpen());
217         assertEquals(channelHandler.channelGroup.size(), 0);
218     }
219 
220     @Test(timeOut = 10_000)
221     public void testNettyChannelWriting() throws Exception {
222 
223         // ------------------------------------------------------------------------------------------------------------
224         // Prepare test
225         // ------------------------------------------------------------------------------------------------------------
226 
227         // Connect channel handler
228         channelHandler.reconnect();
229         // Create client and connect it
230         ClientBootstrap nettyClient = createNettyClientBootstrap();
231         ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
232         // Basic checks for connection
233         while (!channelF.isDone()) /** do nothing */ ;
234         assertTrue(channelF.isSuccess());
235         assertTrue(channelF.getChannel().isConnected());
236         Channel channel = channelF.getChannel();
237         // Eventually the channel group of the server should have 2 elements
238         while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
239         // Write first handshake request
240         TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
241         // NOTE: Add here the required handshake capabilities when necessary
242         handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
243         channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
244 
245         // ------------------------------------------------------------------------------------------------------------
246         // Test channel writing
247         // ------------------------------------------------------------------------------------------------------------
248         testWritingTimestampRequest(channel);
249 
250         testWritingCommitRequest(channel);
251     }
252 
253     private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
254         // Reset mock
255         reset(requestProcessor);
256         TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
257         TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
258         tsBuilder.setTimestampRequest(tsRequestBuilder.build());
259         // Write into the channel
260         channel.write(tsBuilder.build()).await();
261         verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
262         verify(requestProcessor, timeout(100).never())
263                 .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
264     }
265 
266     private void testWritingCommitRequest(Channel channel) throws InterruptedException {
267         // Reset mock
268         reset(requestProcessor);
269         TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
270         TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
271         commitRequestBuilder.setStartTimestamp(666);
272         commitRequestBuilder.addCellId(666);
273         commitBuilder.setCommitRequest(commitRequestBuilder.build());
274         TSOProto.Request r = commitBuilder.build();
275         assertTrue(r.hasCommitRequest());
276         // Write into the channel
277         channel.write(commitBuilder.build()).await();
278         verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
279         verify(requestProcessor, timeout(100).times(1))
280                 .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
281     }
282 
283     // ----------------------------------------------------------------------------------------------------------------
284     // Helper methods
285     // ----------------------------------------------------------------------------------------------------------------
286 
287     private ClientBootstrap createNettyClientBootstrap() {
288 
289         ChannelFactory factory = new NioClientSocketChannelFactory(
290                 Executors.newCachedThreadPool(
291                         new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()),
292                 Executors.newCachedThreadPool(
293                         new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
294         // Create the bootstrap
295         ClientBootstrap bootstrap = new ClientBootstrap(factory);
296         bootstrap.setOption("tcpNoDelay", true);
297         bootstrap.setOption("keepAlive", true);
298         bootstrap.setOption("reuseAddress", true);
299         bootstrap.setOption("connectTimeoutMillis", 100);
300         ChannelPipeline pipeline = bootstrap.getPipeline();
301         pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
302         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
303         pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
304         pipeline.addLast("protobufencoder", new ProtobufEncoder());
305         pipeline.addLast("handler", new SimpleChannelHandler() {
306 
307             @Override
308             public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
309                 LOG.info("Channel {} connected", ctx.getChannel());
310             }
311 
312             @Override
313             public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
314                 LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
315             }
316 
317         });
318         return bootstrap;
319     }
320 
321 }