View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import org.apache.omid.proto.TSOProto;
22  import org.apache.omid.tso.ProgrammableTSOServer.Response.ResponseType;
23  import org.jboss.netty.bootstrap.ServerBootstrap;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelFactory;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.ExceptionEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  import org.jboss.netty.channel.SimpleChannelHandler;
32  import org.jboss.netty.channel.group.ChannelGroup;
33  import org.jboss.netty.channel.group.DefaultChannelGroup;
34  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import javax.inject.Inject;
39  import java.net.InetSocketAddress;
40  import java.nio.channels.ClosedChannelException;
41  import java.util.LinkedList;
42  import java.util.Queue;
43  import java.util.concurrent.Executors;
44  
45  /**
46   * Used in tests. Allows to program the set of responses returned by a TSO
47   */
48  public class ProgrammableTSOServer extends SimpleChannelHandler {
49  
50      private static final Logger LOG = LoggerFactory.getLogger(ProgrammableTSOServer.class);
51  
52      private ChannelFactory factory;
53      private ChannelGroup channelGroup;
54  
55      private Queue<Response> responseQueue = new LinkedList<>();
56  
57      @Inject
58      public ProgrammableTSOServer(int port) {
59          // Setup netty listener
60          factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
61                  .setNameFormat("boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder()
62                  .setNameFormat("worker-%d").build()), (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
63  
64          // Create the global ChannelGroup
65          channelGroup = new DefaultChannelGroup(ProgrammableTSOServer.class.getName());
66  
67          ServerBootstrap bootstrap = new ServerBootstrap(factory);
68          bootstrap.setPipelineFactory(new TSOChannelHandler.TSOPipelineFactory(this));
69  
70          // Add the parent channel to the group
71          Channel channel = bootstrap.bind(new InetSocketAddress(port));
72          channelGroup.add(channel);
73  
74          LOG.info("********** Dumb TSO Server running on port {} **********", port);
75      }
76  
77      // ************************* Main interface for tests *********************
78  
79      /**
80       * Allows to add response to the queue of responses
81       *
82       * @param r
83       *            the response to add
84       */
85      public void queueResponse(Response r) {
86          responseQueue.add(r);
87      }
88  
89      /**
90       * Removes all the current responses in the queue
91       */
92      public void cleanResponses() {
93          responseQueue.clear();
94      }
95  
96      // ******************** End of Main interface for tests *******************
97  
98      @Override
99      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
100         channelGroup.add(ctx.getChannel());
101     }
102 
103     @Override
104     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
105         channelGroup.remove(ctx.getChannel());
106     }
107 
108     /**
109      * Handle received messages
110      */
111     @Override
112     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
113         Object msg = e.getMessage();
114         if (msg instanceof TSOProto.Request) {
115             TSOProto.Request request = (TSOProto.Request) msg;
116             Channel channel = ctx.getChannel();
117             if (request.hasHandshakeRequest()) {
118                 checkHandshake(ctx, request.getHandshakeRequest());
119                 return;
120             }
121             if (!handshakeCompleted(ctx)) {
122                 LOG.info("handshake not completed");
123                 channel.close();
124             }
125 
126             Response resp = responseQueue.poll();
127             if (request.hasTimestampRequest()) {
128                 if (resp == null || resp.type != ResponseType.TIMESTAMP) {
129                     throw new IllegalStateException("Expecting TS response to send but got " + resp);
130                 }
131                 TimestampResponse tsResp = (TimestampResponse) resp;
132                 sendTimestampResponse(tsResp.startTS, channel);
133             } else if (request.hasCommitRequest()) {
134                 if (resp == null) {
135                     throw new IllegalStateException("Expecting COMMIT response to send but got null");
136                 }
137                 switch (resp.type) {
138                     case COMMIT:
139                         CommitResponse commitResp = (CommitResponse) resp;
140                         sendCommitResponse(commitResp.startTS, commitResp.commitTS, channel);
141                         break;
142                     case ABORT:
143                         AbortResponse abortResp = (AbortResponse) resp;
144                         sendAbortResponse(abortResp.startTS, channel);
145                         break;
146                     default:
147                         throw new IllegalStateException("Expecting COMMIT response to send but got " + resp.type);
148                 }
149             } else {
150                 LOG.error("Invalid request {}", request);
151                 ctx.getChannel().close();
152             }
153         } else {
154             LOG.error("Unknown message type", msg);
155         }
156     }
157 
158     @Override
159     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
160         if (e.getCause() instanceof ClosedChannelException) {
161             return;
162         }
163         LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
164         Channels.close(e.getChannel());
165     }
166 
167     private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
168         TSOProto.HandshakeResponse.Builder response = TSOProto.HandshakeResponse.newBuilder();
169         if (request.hasClientCapabilities()) {
170 
171             response.setClientCompatible(true).setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
172             TSOChannelContext tsoCtx = new TSOChannelContext();
173             tsoCtx.setHandshakeComplete();
174             ctx.setAttachment(tsoCtx);
175         } else {
176             response.setClientCompatible(false);
177         }
178         ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
179     }
180 
181     private boolean handshakeCompleted(ChannelHandlerContext ctx) {
182         Object o = ctx.getAttachment();
183         if (o instanceof TSOChannelContext) {
184             TSOChannelContext tsoCtx = (TSOChannelContext) o;
185             return tsoCtx.getHandshakeComplete();
186         }
187         return false;
188     }
189 
190     private void sendTimestampResponse(long startTimestamp, Channel c) {
191         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
192         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
193         respBuilder.setStartTimestamp(startTimestamp);
194         builder.setTimestampResponse(respBuilder.build());
195         c.write(builder.build());
196     }
197 
198     private void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
199         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
200         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
201         commitBuilder.setAborted(false).setStartTimestamp(startTimestamp).setCommitTimestamp(commitTimestamp);
202         builder.setCommitResponse(commitBuilder.build());
203         c.write(builder.build());
204     }
205 
206     private void sendAbortResponse(long startTimestamp, Channel c) {
207         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
208         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
209         commitBuilder.setAborted(true).setStartTimestamp(startTimestamp);
210         builder.setCommitResponse(commitBuilder.build());
211         c.write(builder.build());
212     }
213 
214     private static class TSOChannelContext {
215         boolean handshakeComplete;
216 
217         TSOChannelContext() {
218             handshakeComplete = false;
219         }
220 
221         boolean getHandshakeComplete() {
222             return handshakeComplete;
223         }
224 
225         void setHandshakeComplete() {
226             handshakeComplete = true;
227         }
228     }
229 
230     public static class TimestampResponse extends Response {
231 
232         final long startTS;
233 
234         public TimestampResponse(long startTS) {
235             super(ResponseType.TIMESTAMP);
236             this.startTS = startTS;
237         }
238 
239     }
240 
241     public static class CommitResponse extends Response {
242 
243         final long startTS;
244         final long commitTS;
245 
246         public CommitResponse(long startTS, long commitTS) {
247             super(ResponseType.COMMIT);
248             this.startTS = startTS;
249             this.commitTS = commitTS;
250         }
251 
252     }
253 
254     public static class AbortResponse extends Response {
255 
256         final long startTS;
257 
258         public AbortResponse(long startTS) {
259             super(ResponseType.ABORT);
260             this.startTS = startTS;
261         }
262 
263     }
264 
265     abstract static class Response {
266 
267         enum ResponseType {
268             TIMESTAMP, COMMIT, ABORT
269         }
270 
271         final ResponseType type;
272 
273         public Response(ResponseType type) {
274             this.type = type;
275         }
276 
277     }
278 
279 }