View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.base.Charsets;
21  import com.google.common.net.HostAndPort;
22  import com.google.common.util.concurrent.AbstractFuture;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  import org.apache.omid.proto.TSOProto;
25  import org.apache.omid.zk.ZKUtils;
26  import org.apache.statemachine.StateMachine;
27  import org.apache.curator.framework.CuratorFramework;
28  import org.apache.curator.framework.recipes.cache.ChildData;
29  import org.apache.curator.framework.recipes.cache.NodeCache;
30  import org.apache.curator.framework.recipes.cache.NodeCacheListener;
31  import org.jboss.netty.bootstrap.ClientBootstrap;
32  import org.jboss.netty.channel.Channel;
33  import org.jboss.netty.channel.ChannelFactory;
34  import org.jboss.netty.channel.ChannelFuture;
35  import org.jboss.netty.channel.ChannelFutureListener;
36  import org.jboss.netty.channel.ChannelHandlerContext;
37  import org.jboss.netty.channel.ChannelPipeline;
38  import org.jboss.netty.channel.ChannelStateEvent;
39  import org.jboss.netty.channel.ExceptionEvent;
40  import org.jboss.netty.channel.MessageEvent;
41  import org.jboss.netty.channel.SimpleChannelHandler;
42  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
43  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
44  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
45  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
46  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
47  import org.jboss.netty.util.HashedWheelTimer;
48  import org.jboss.netty.util.Timeout;
49  import org.jboss.netty.util.TimerTask;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  
53  import java.io.IOException;
54  import java.net.InetSocketAddress;
55  import java.util.ArrayDeque;
56  import java.util.HashMap;
57  import java.util.Iterator;
58  import java.util.Map;
59  import java.util.Queue;
60  import java.util.Set;
61  import java.util.concurrent.ExecutionException;
62  import java.util.concurrent.Executors;
63  import java.util.concurrent.ScheduledExecutorService;
64  import java.util.concurrent.TimeUnit;
65  
66  /**
67   * Describes the abstract methods to communicate to the TSO server
68   */
69  public class TSOClient implements TSOProtocol, NodeCacheListener {
70  
71      private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
72  
73      // Basic configuration constants & defaults TODO: Move DEFAULT_ZK_CLUSTER to a conf class???
74      public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
75  
76      private static final long DEFAULT_EPOCH = -1L;
77      private volatile long epoch = DEFAULT_EPOCH;
78  
79      // Attributes
80      private CuratorFramework zkClient;
81      private NodeCache currentTSOZNode;
82  
83      private ChannelFactory factory;
84      private ClientBootstrap bootstrap;
85      private Channel currentChannel;
86      private final ScheduledExecutorService fsmExecutor;
87      StateMachine.Fsm fsm;
88  
89      private final int requestTimeoutInMs;
90      private final int requestMaxRetries;
91      private final int tsoReconnectionDelayInSecs;
92      private InetSocketAddress tsoAddr;
93      private String zkCurrentTsoPath;
94  
95      // ----------------------------------------------------------------------------------------------------------------
96      // Construction
97      // ----------------------------------------------------------------------------------------------------------------
98  
99      public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
100         return new TSOClient(tsoClientConf);
101     }
102 
103     // Avoid instantiation
104     private TSOClient(OmidClientConfiguration omidConf) throws IOException {
105 
106         // Start client with Nb of active threads = 3 as maximum.
107         int tsoExecutorThreads = omidConf.getExecutorThreads();
108 
109         factory = new NioClientSocketChannelFactory(
110                 Executors.newCachedThreadPool(
111                         new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
112                 Executors.newCachedThreadPool(
113                         new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
114         // Create the bootstrap
115         bootstrap = new ClientBootstrap(factory);
116 
117         requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
118         requestMaxRetries = omidConf.getRequestMaxRetries();
119         tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
120 
121         LOG.info("Connecting to TSO...");
122         HostAndPort hp;
123         switch (omidConf.getConnectionType()) {
124             case HA:
125                 zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
126                                                 omidConf.getZkNamespace(),
127                                                 omidConf.getZkConnectionTimeoutInSecs());
128                 zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
129                 configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
130                 String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
131                 // TSO info includes the new TSO host:port address and epoch
132                 String[] currentTSOAndEpochArray = tsoInfo.split("#");
133                 hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
134                 setTSOAddress(hp.getHostText(), hp.getPort());
135                 epoch = Long.parseLong(currentTSOAndEpochArray[1]);
136                 LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
137                 break;
138             case DIRECT:
139             default:
140                 hp = HostAndPort.fromString(omidConf.getConnectionString());
141                 setTSOAddress(hp.getHostText(), hp.getPort());
142                 LOG.info("\t* TSO host:port {} will be connected directly", hp);
143                 break;
144         }
145 
146         fsmExecutor = Executors.newSingleThreadScheduledExecutor(
147                 new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
148         fsm = new StateMachine.FsmImpl(fsmExecutor);
149         fsm.setInitState(new DisconnectedState(fsm));
150 
151         ChannelPipeline pipeline = bootstrap.getPipeline();
152         pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
153         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
154         pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
155         pipeline.addLast("protobufencoder", new ProtobufEncoder());
156         pipeline.addLast("handler", new Handler(fsm));
157 
158         bootstrap.setOption("tcpNoDelay", true);
159         bootstrap.setOption("keepAlive", true);
160         bootstrap.setOption("reuseAddress", true);
161         bootstrap.setOption("connectTimeoutMillis", 100);
162     }
163 
164     // ----------------------------------------------------------------------------------------------------------------
165     // TSOProtocol interface
166     // ----------------------------------------------------------------------------------------------------------------
167 
168     /**
169      * @see TSOProtocol#getNewStartTimestamp()
170      */
171     @Override
172     public TSOFuture<Long> getNewStartTimestamp() {
173         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
174         TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
175         builder.setTimestampRequest(tsreqBuilder.build());
176         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
177         fsm.sendEvent(request);
178         return new ForwardingTSOFuture<>(request);
179     }
180 
181     /**
182      * @see TSOProtocol#commit(long, Set)
183      */
184     @Override
185     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
186         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
187         TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
188         commitbuilder.setStartTimestamp(transactionId);
189         for (CellId cell : cells) {
190             commitbuilder.addCellId(cell.getCellId());
191         }
192         builder.setCommitRequest(commitbuilder.build());
193         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
194         fsm.sendEvent(request);
195         return new ForwardingTSOFuture<>(request);
196     }
197 
198     /**
199      * @see TSOProtocol#close()
200      */
201     @Override
202     public TSOFuture<Void> close() {
203         final CloseEvent closeEvent = new CloseEvent();
204         fsm.sendEvent(closeEvent);
205         closeEvent.addListener(new Runnable() {
206             @Override
207             public void run() {
208                 try {
209                     closeEvent.get();
210                 } catch (InterruptedException e) {
211                     Thread.currentThread().interrupt();
212                     e.printStackTrace();
213                 } catch (ExecutionException e) {
214                     e.printStackTrace();
215                 } finally {
216                     fsmExecutor.shutdown();
217                     if (currentTSOZNode != null) {
218                         try {
219                             currentTSOZNode.close();
220                         } catch (IOException e) {
221                             e.printStackTrace();
222                         }
223                     }
224                     if (zkClient != null) {
225                         zkClient.close();
226                     }
227                 }
228 
229             }
230         }, fsmExecutor);
231         return new ForwardingTSOFuture<>(closeEvent);
232     }
233 
234     // ----------------------------------------------------------------------------------------------------------------
235     // High availability related interface
236     // ----------------------------------------------------------------------------------------------------------------
237 
238     /**
239      * Returns the epoch of the TSO server that initialized this transaction.
240      * Used for high availability support.
241      */
242     public long getEpoch() {
243         return epoch;
244     }
245 
246     // ----------------------------------------------------------------------------------------------------------------
247     // NodeCacheListener interface
248     // ----------------------------------------------------------------------------------------------------------------
249 
250     @Override
251     public void nodeChanged() throws Exception {
252 
253         String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
254         // TSO info includes the new TSO host:port address and epoch
255         String[] currentTSOAndEpochArray = tsoInfo.split("#");
256         HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
257         setTSOAddress(hp.getHostText(), hp.getPort());
258         epoch = Long.parseLong(currentTSOAndEpochArray[1]);
259         LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
260         if (currentChannel != null && currentChannel.isConnected()) {
261             LOG.info("\tClosing channel with previous TSO {}", currentChannel);
262             currentChannel.close();
263         }
264 
265     }
266 
267     // ****************************************** Finite State Machine ************************************************
268 
269     // ----------------------------------------------------------------------------------------------------------------
270     // FSM: Events
271     // ----------------------------------------------------------------------------------------------------------------
272 
273     private static class ParamEvent<T> implements StateMachine.Event {
274 
275         final T param;
276 
277         ParamEvent(T param) {
278             this.param = param;
279         }
280 
281         T getParam() {
282             return param;
283         }
284     }
285 
286     private static class ErrorEvent extends ParamEvent<Throwable> {
287 
288         ErrorEvent(Throwable t) {
289             super(t);
290         }
291     }
292 
293     private static class ConnectedEvent extends ParamEvent<Channel> {
294 
295         ConnectedEvent(Channel c) {
296             super(c);
297         }
298     }
299 
300     private static class UserEvent<T> extends AbstractFuture<T>
301             implements StateMachine.DeferrableEvent {
302 
303         void success(T value) {
304             set(value);
305         }
306 
307         @Override
308         public void error(Throwable t) {
309             setException(t);
310         }
311     }
312 
313     private static class CloseEvent extends UserEvent<Void> {
314 
315     }
316 
317     private static class ChannelClosedEvent extends ParamEvent<Throwable> {
318 
319         ChannelClosedEvent(Throwable t) {
320             super(t);
321         }
322     }
323 
324     private static class ReconnectEvent implements StateMachine.Event {
325 
326     }
327 
328     private static class HandshakeTimeoutEvent implements StateMachine.Event {
329 
330     }
331 
332     private static class TimestampRequestTimeoutEvent implements StateMachine.Event {
333 
334     }
335 
336     private static class CommitRequestTimeoutEvent implements StateMachine.Event {
337 
338         final long startTimestamp;
339 
340         CommitRequestTimeoutEvent(long startTimestamp) {
341             this.startTimestamp = startTimestamp;
342         }
343 
344         public long getStartTimestamp() {
345             return startTimestamp;
346         }
347     }
348 
349     private static class RequestEvent extends UserEvent<Long> {
350 
351         TSOProto.Request req;
352         int retriesLeft;
353 
354         RequestEvent(TSOProto.Request req, int retriesLeft) {
355             this.req = req;
356             this.retriesLeft = retriesLeft;
357         }
358 
359         TSOProto.Request getRequest() {
360             return req;
361         }
362 
363         void setRequest(TSOProto.Request request) {
364             this.req = request;
365         }
366 
367         int getRetriesLeft() {
368             return retriesLeft;
369         }
370 
371         void decrementRetries() {
372             retriesLeft--;
373         }
374 
375     }
376 
377     private static class ResponseEvent extends ParamEvent<TSOProto.Response> {
378 
379         ResponseEvent(TSOProto.Response r) {
380             super(r);
381         }
382     }
383 
384     // ----------------------------------------------------------------------------------------------------------------
385     // FSM: States
386     // ----------------------------------------------------------------------------------------------------------------
387 
388     class BaseState extends StateMachine.State {
389 
390         BaseState(StateMachine.Fsm fsm) {
391             super(fsm);
392         }
393 
394         public StateMachine.State handleEvent(StateMachine.Event e) {
395             LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
396             return this;
397         }
398     }
399 
400     class DisconnectedState extends BaseState {
401 
402         DisconnectedState(StateMachine.Fsm fsm) {
403             super(fsm);
404             LOG.debug("NEW STATE: DISCONNECTED");
405         }
406 
407         public StateMachine.State handleEvent(RequestEvent e) {
408             fsm.deferEvent(e);
409             return tryToConnectToTSOServer();
410         }
411 
412         public StateMachine.State handleEvent(CloseEvent e) {
413             factory.releaseExternalResources();
414             e.success(null);
415             return this;
416         }
417 
418         private StateMachine.State tryToConnectToTSOServer() {
419             final InetSocketAddress tsoAddress = getAddress();
420             LOG.info("Trying to connect to TSO [{}]", tsoAddress);
421             ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
422             channelFuture.addListener(new ChannelFutureListener() {
423                 @Override
424                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
425                     if (channelFuture.isSuccess()) {
426                         LOG.info("Connection to TSO [{}] established. Channel {}",
427                                  tsoAddress, channelFuture.getChannel());
428                     } else {
429                         LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
430                                   tsoAddress, channelFuture.getChannel());
431                     }
432                 }
433             });
434             return new ConnectingState(fsm);
435         }
436     }
437 
438     private class ConnectingState extends BaseState {
439 
440         ConnectingState(StateMachine.Fsm fsm) {
441             super(fsm);
442             LOG.debug("NEW STATE: CONNECTING");
443         }
444 
445         public StateMachine.State handleEvent(UserEvent e) {
446             fsm.deferEvent(e);
447             return this;
448         }
449 
450         public StateMachine.State handleEvent(ConnectedEvent e) {
451             return new HandshakingState(fsm, e.getParam());
452         }
453 
454         public StateMachine.State handleEvent(ChannelClosedEvent e) {
455             return new ConnectionFailedState(fsm, e.getParam());
456         }
457 
458         public StateMachine.State handleEvent(ErrorEvent e) {
459             return new ConnectionFailedState(fsm, e.getParam());
460         }
461 
462     }
463 
464     private static class RequestAndTimeout {
465 
466         final RequestEvent event;
467         final Timeout timeout;
468 
469         RequestAndTimeout(RequestEvent event, Timeout timeout) {
470             this.event = event;
471             this.timeout = timeout;
472         }
473 
474         RequestEvent getRequest() {
475             return event;
476         }
477 
478         Timeout getTimeout() {
479             return timeout;
480         }
481 
482         public String toString() {
483             String info = "Request type ";
484             if (event.getRequest().hasTimestampRequest()) {
485                 info += "[Timestamp]";
486             } else if (event.getRequest().hasCommitRequest()) {
487                 info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
488             } else {
489                 info += "NONE";
490             }
491             return info;
492         }
493     }
494 
495     private class HandshakingState extends BaseState {
496 
497         final Channel channel;
498 
499         final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
500                 new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
501         final Timeout timeout;
502 
503         HandshakingState(StateMachine.Fsm fsm, Channel channel) {
504             super(fsm);
505             LOG.debug("NEW STATE: HANDSHAKING");
506             this.channel = channel;
507             TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
508             // Add the required handshake capabilities when necessary
509             handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
510             channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
511             timeout = newTimeout();
512         }
513 
514         private Timeout newTimeout() {
515             if (requestTimeoutInMs > 0) {
516                 return timeoutExecutor.newTimeout(new TimerTask() {
517                     @Override
518                     public void run(Timeout timeout) {
519                         fsm.sendEvent(new HandshakeTimeoutEvent());
520                     }
521                 }, 30, TimeUnit.SECONDS);
522             } else {
523                 return null;
524             }
525         }
526 
527         public StateMachine.State handleEvent(UserEvent e) {
528             fsm.deferEvent(e);
529             return this;
530         }
531 
532         public StateMachine.State handleEvent(ResponseEvent e) {
533             if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
534                 if (timeout != null) {
535                     timeout.cancel();
536                 }
537                 return new ConnectedState(fsm, channel, timeoutExecutor);
538             } else {
539                 cleanupState();
540                 LOG.error("Client incompatible with server");
541                 return new HandshakeFailedState(fsm, new HandshakeFailedException());
542             }
543         }
544 
545         public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
546             cleanupState();
547             return new ClosingState(fsm);
548         }
549 
550         public StateMachine.State handleEvent(ErrorEvent e) {
551             cleanupState();
552             Throwable exception = e.getParam();
553             LOG.error("Error during handshake", exception);
554             return new HandshakeFailedState(fsm, exception);
555         }
556 
557         private void cleanupState() {
558             timeoutExecutor.stop();
559             channel.close();
560             if (timeout != null) {
561                 timeout.cancel();
562             }
563         }
564 
565     }
566 
567     class ConnectionFailedState extends BaseState {
568 
569         final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
570                 new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
571 
572         Throwable exception;
573 
574         ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
575             super(fsm);
576             LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
577             this.exception = exception;
578             reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
579                 @Override
580                 public void run(Timeout timeout) {
581                     fsm.sendEvent(new ReconnectEvent());
582                 }
583             }, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
584         }
585 
586         public StateMachine.State handleEvent(UserEvent e) {
587             e.error(exception);
588             return this;
589         }
590 
591         public StateMachine.State handleEvent(ErrorEvent e) {
592             return this;
593         }
594 
595         public StateMachine.State handleEvent(ChannelClosedEvent e) {
596             return new DisconnectedState(fsm);
597         }
598 
599         public StateMachine.State handleEvent(ReconnectEvent e) {
600             return new DisconnectedState(fsm);
601         }
602 
603     }
604 
605     private class HandshakeFailedState extends ConnectionFailedState {
606 
607         HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
608             super(fsm, exception);
609             LOG.debug("STATE: HANDSHAKING FAILED");
610         }
611 
612     }
613 
614     class ConnectedState extends BaseState {
615 
616         final Queue<RequestAndTimeout> timestampRequests;
617         final Map<Long, RequestAndTimeout> commitRequests;
618         final Channel channel;
619 
620         final HashedWheelTimer timeoutExecutor;
621 
622         ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
623             super(fsm);
624             LOG.debug("NEW STATE: CONNECTED");
625             this.channel = channel;
626             this.timeoutExecutor = timeoutExecutor;
627             timestampRequests = new ArrayDeque<>();
628             commitRequests = new HashMap<>();
629         }
630 
631         private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
632             if (requestTimeoutInMs > 0) {
633                 return timeoutExecutor.newTimeout(new TimerTask() {
634                     @Override
635                     public void run(Timeout timeout) {
636                         fsm.sendEvent(timeoutEvent);
637                     }
638                 }, requestTimeoutInMs, TimeUnit.MILLISECONDS);
639             } else {
640                 return null;
641             }
642         }
643 
644         private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
645             TSOProto.Request req = request.getRequest();
646 
647             if (req.hasTimestampRequest()) {
648                 timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
649             } else if (req.hasCommitRequest()) {
650                 TSOProto.CommitRequest commitReq = req.getCommitRequest();
651                 commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
652                         request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
653             } else {
654                 request.error(new IllegalArgumentException("Unknown request type"));
655                 return;
656             }
657             ChannelFuture f = channel.write(req);
658 
659             f.addListener(new ChannelFutureListener() {
660                 @Override
661                 public void operationComplete(ChannelFuture future) {
662                     if (!future.isSuccess()) {
663                         fsm.sendEvent(new ErrorEvent(future.getCause()));
664                     }
665                 }
666             });
667         }
668 
669         private void handleResponse(ResponseEvent response) {
670             TSOProto.Response resp = response.getParam();
671             if (resp.hasTimestampResponse()) {
672                 if (timestampRequests.size() == 0) {
673                     LOG.debug("Received timestamp response when no requests outstanding");
674                     return;
675                 }
676                 RequestAndTimeout e = timestampRequests.remove();
677                 e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
678                 if (e.getTimeout() != null) {
679                     e.getTimeout().cancel();
680                 }
681             } else if (resp.hasCommitResponse()) {
682                 long startTimestamp = resp.getCommitResponse().getStartTimestamp();
683                 RequestAndTimeout e = commitRequests.remove(startTimestamp);
684                 if (e == null) {
685                     LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
686                     return;
687                 }
688                 if (e.getTimeout() != null) {
689                     e.getTimeout().cancel();
690                 }
691                 if (resp.getCommitResponse().getAborted()) {
692                     e.getRequest().error(new AbortException());
693                 } else {
694                     e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
695                 }
696             }
697         }
698 
699         public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
700             if (!timestampRequests.isEmpty()) {
701                 RequestAndTimeout r = timestampRequests.remove();
702                 if (r.getTimeout() != null) {
703                     r.getTimeout().cancel();
704                 }
705                 queueRetryOrError(fsm, r.getRequest());
706             }
707             return this;
708         }
709 
710         public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
711             long startTimestamp = e.getStartTimestamp();
712             if (commitRequests.containsKey(startTimestamp)) {
713                 RequestAndTimeout r = commitRequests.remove(startTimestamp);
714                 if (r.getTimeout() != null) {
715                     r.getTimeout().cancel();
716                 }
717                 queueRetryOrError(fsm, r.getRequest());
718             }
719             return this;
720         }
721 
722         public StateMachine.State handleEvent(CloseEvent e) {
723             LOG.debug("CONNECTED STATE: CloseEvent");
724             timeoutExecutor.stop();
725             closeChannelAndErrorRequests();
726             fsm.deferEvent(e);
727             return new ClosingState(fsm);
728         }
729 
730         public StateMachine.State handleEvent(RequestEvent e) {
731             sendRequest(fsm, e);
732             return this;
733         }
734 
735         public StateMachine.State handleEvent(ResponseEvent e) {
736             handleResponse(e);
737             return this;
738         }
739 
740         public StateMachine.State handleEvent(ErrorEvent e) {
741             LOG.debug("CONNECTED STATE: ErrorEvent");
742             timeoutExecutor.stop();
743             handleError(fsm);
744             return new ClosingState(fsm);
745         }
746 
747         private void handleError(StateMachine.Fsm fsm) {
748             LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
749             while (timestampRequests.size() > 0) {
750                 RequestAndTimeout r = timestampRequests.remove();
751                 if (r.getTimeout() != null) {
752                     r.getTimeout().cancel();
753                 }
754                 queueRetryOrError(fsm, r.getRequest());
755             }
756             Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
757             while (iter.hasNext()) {
758                 RequestAndTimeout r = iter.next().getValue();
759                 if (r.getTimeout() != null) {
760                     r.getTimeout().cancel();
761                 }
762                 queueRetryOrError(fsm, r.getRequest());
763                 iter.remove();
764             }
765             channel.close();
766         }
767 
768         private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
769             if (e.getRetriesLeft() > 0) {
770                 e.decrementRetries();
771                 if (e.getRequest().hasCommitRequest()) {
772                     TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
773                     if (!commitRequest.getIsRetry()) { // Create a new retry for the commit request
774                         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
775                         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
776                         commitBuilder.mergeFrom(commitRequest);
777                         commitBuilder.setIsRetry(true);
778                         builder.setCommitRequest(commitBuilder.build());
779                         e.setRequest(builder.build());
780                     }
781                 }
782                 fsm.sendEvent(e);
783             } else {
784                 e.error(
785                         new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
786             }
787         }
788 
789         private void closeChannelAndErrorRequests() {
790             channel.close();
791             for (RequestAndTimeout r : timestampRequests) {
792                 if (r.getTimeout() != null) {
793                     r.getTimeout().cancel();
794                 }
795                 r.getRequest().error(new ClosingException());
796             }
797             for (RequestAndTimeout r : commitRequests.values()) {
798                 if (r.getTimeout() != null) {
799                     r.getTimeout().cancel();
800                 }
801                 r.getRequest().error(new ClosingException());
802             }
803         }
804     }
805 
806     private class ClosingState extends BaseState {
807 
808         ClosingState(StateMachine.Fsm fsm) {
809             super(fsm);
810             LOG.debug("NEW STATE: CLOSING");
811         }
812 
813         public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
814             // Ignored. They will be retried or errored
815             return this;
816         }
817 
818         public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
819             // Ignored. They will be retried or errored
820             return this;
821         }
822 
823         public StateMachine.State handleEvent(ErrorEvent e) {
824             // Ignored. They will be retried or errored
825             return this;
826         }
827 
828         public StateMachine.State handleEvent(ResponseEvent e) {
829             // Ignored. They will be retried or errored
830             return this;
831         }
832 
833         public StateMachine.State handleEvent(UserEvent e) {
834             fsm.deferEvent(e);
835             return this;
836         }
837 
838         public StateMachine.State handleEvent(ChannelClosedEvent e) {
839             return new DisconnectedState(fsm);
840         }
841 
842         public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
843             return this;
844         }
845 
846     }
847 
848     // ----------------------------------------------------------------------------------------------------------------
849     // Helper classes & methods
850     // ----------------------------------------------------------------------------------------------------------------
851 
852     private class Handler extends SimpleChannelHandler {
853 
854         private StateMachine.Fsm fsm;
855 
856         Handler(StateMachine.Fsm fsm) {
857             this.fsm = fsm;
858         }
859 
860         @Override
861         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
862             currentChannel = e.getChannel();
863             LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
864             fsm.sendEvent(new ConnectedEvent(e.getChannel()));
865         }
866 
867         @Override
868         public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
869             LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
870             fsm.sendEvent(new ErrorEvent(new ConnectionException()));
871         }
872 
873         @Override
874         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
875             LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
876             fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
877         }
878 
879         @Override
880         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
881             if (e.getMessage() instanceof TSOProto.Response) {
882                 fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
883             } else {
884                 LOG.warn("Received unknown message", e.getMessage());
885             }
886         }
887 
888         @Override
889         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
890             LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
891             fsm.sendEvent(new ErrorEvent(e.getCause()));
892         }
893     }
894 
895     private synchronized void setTSOAddress(String host, int port) {
896         tsoAddr = new InetSocketAddress(host, port);
897     }
898 
899     private synchronized InetSocketAddress getAddress() {
900         return tsoAddr;
901     }
902 
903     private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
904         try {
905             currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
906             currentTSOZNode.getListenable().addListener(this);
907             currentTSOZNode.start(true);
908         } catch (Exception e) {
909             throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
910         }
911     }
912 
913     private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
914         ChildData currentTSOData = currentTSOZNode.getCurrentData();
915         if (currentTSOData == null) {
916             throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
917         }
918         byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
919         if (currentTSOAndEpochAsBytes == null) {
920             throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
921         }
922         return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
923     }
924 
925 }