View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
21  import org.apache.phoenix.thirdparty.com.google.common.net.HostAndPort;
22  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture;
23  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import org.apache.omid.proto.TSOProto;
26  import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
27  import org.apache.omid.zk.ZKUtils;
28  import org.apache.statemachine.StateMachine;
29  import org.apache.curator.framework.CuratorFramework;
30  import org.apache.curator.framework.recipes.cache.ChildData;
31  import org.apache.curator.framework.recipes.cache.NodeCache;
32  import org.apache.curator.framework.recipes.cache.NodeCacheListener;
33  import org.jboss.netty.bootstrap.ClientBootstrap;
34  import org.jboss.netty.channel.Channel;
35  import org.jboss.netty.channel.ChannelFactory;
36  import org.jboss.netty.channel.ChannelFuture;
37  import org.jboss.netty.channel.ChannelFutureListener;
38  import org.jboss.netty.channel.ChannelHandlerContext;
39  import org.jboss.netty.channel.ChannelPipeline;
40  import org.jboss.netty.channel.ChannelStateEvent;
41  import org.jboss.netty.channel.ExceptionEvent;
42  import org.jboss.netty.channel.MessageEvent;
43  import org.jboss.netty.channel.SimpleChannelHandler;
44  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
45  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
46  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
47  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
48  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
49  import org.jboss.netty.util.HashedWheelTimer;
50  import org.jboss.netty.util.Timeout;
51  import org.jboss.netty.util.TimerTask;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  import java.io.IOException;
56  import java.net.InetSocketAddress;
57  import java.util.ArrayDeque;
58  import java.util.HashMap;
59  import java.util.HashSet;
60  import java.util.Iterator;
61  import java.util.Map;
62  import java.util.Queue;
63  import java.util.Set;
64  import java.util.concurrent.ExecutionException;
65  import java.util.concurrent.Executors;
66  import java.util.concurrent.ScheduledExecutorService;
67  import java.util.concurrent.TimeUnit;
68  
69  
70  /**
71   * Describes the abstract methods to communicate to the TSO server
72   */
73  public class TSOClient implements TSOProtocol, NodeCacheListener {
74  
75      private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
76  
77      // Basic configuration constants & defaults TODO: Move DEFAULT_ZK_CLUSTER to a conf class???
78      public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
79  
80      private static final long DEFAULT_EPOCH = -1L;
81      private volatile long epoch = DEFAULT_EPOCH;
82  
83      // Attributes
84      private CuratorFramework zkClient;
85      private NodeCache currentTSOZNode;
86  
87      private ChannelFactory factory;
88      private ClientBootstrap bootstrap;
89      private Channel currentChannel;
90      private final ScheduledExecutorService fsmExecutor;
91      StateMachine.Fsm fsm;
92  
93      private final int requestTimeoutInMs;
94      private final int requestMaxRetries;
95      private final int tsoReconnectionDelayInSecs;
96      private InetSocketAddress tsoAddr;
97      private String zkCurrentTsoPath;
98  
99      private boolean lowLatency;
100 
101     // Use to extract unique table identifiers from the modified cells list.
102 
103     // Conflict detection level of the entire system. Can either be Row or Cell level.
104     private ConflictDetectionLevel conflictDetectionLevel;
105 
106 
107     // ----------------------------------------------------------------------------------------------------------------
108     // Construction
109     // ----------------------------------------------------------------------------------------------------------------
110 
111     public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
112         return new TSOClient(tsoClientConf);
113     }
114 
115     // Avoid instantiation
116     private TSOClient(OmidClientConfiguration omidConf) throws IOException {
117 
118         // Start client with Nb of active threads = 3 as maximum.
119         int tsoExecutorThreads = omidConf.getExecutorThreads();
120 
121         factory = new NioClientSocketChannelFactory(
122                 Executors.newCachedThreadPool(
123                         new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
124                 Executors.newCachedThreadPool(
125                         new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
126         // Create the bootstrap
127         bootstrap = new ClientBootstrap(factory);
128 
129         requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
130         requestMaxRetries = omidConf.getRequestMaxRetries();
131         tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
132 
133         LOG.info("Connecting to TSO...");
134         HostAndPort hp;
135         switch (omidConf.getConnectionType()) {
136             case HA:
137                 zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
138                                                 omidConf.getZkNamespace(),
139                                                 omidConf.getZkConnectionTimeoutInSecs());
140                 zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
141                 configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
142                 String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
143                 // TSO info includes the new TSO host:port address and epoch
144                 String[] currentTSOAndEpochArray = tsoInfo.split("#");
145                 hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
146                 setTSOAddress(hp.getHost(), hp.getPort());
147                 epoch = Long.parseLong(currentTSOAndEpochArray[1]);
148                 LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
149                 break;
150             case DIRECT:
151             default:
152                 hp = HostAndPort.fromString(omidConf.getConnectionString());
153                 setTSOAddress(hp.getHost(), hp.getPort());
154                 LOG.info("\t* TSO host:port {} will be connected directly", hp);
155                 break;
156         }
157 
158         fsmExecutor = Executors.newSingleThreadScheduledExecutor(
159                 new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
160         fsm = new StateMachine.FsmImpl(fsmExecutor);
161         fsm.setInitState(new DisconnectedState(fsm));
162 
163         ChannelPipeline pipeline = bootstrap.getPipeline();
164         pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
165         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
166         pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
167         pipeline.addLast("protobufencoder", new ProtobufEncoder());
168         pipeline.addLast("handler", new Handler(fsm));
169 
170         bootstrap.setOption("tcpNoDelay", true);
171         bootstrap.setOption("keepAlive", true);
172         bootstrap.setOption("reuseAddress", true);
173         bootstrap.setOption("connectTimeoutMillis", 100);
174         lowLatency = false;
175 
176 
177 
178         conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
179 
180     }
181 
182     // ----------------------------------------------------------------------------------------------------------------
183     // TSOProtocol interface
184     // ----------------------------------------------------------------------------------------------------------------
185 
186     /**
187      * @see TSOProtocol#getNewStartTimestamp()
188      */
189     @Override
190     public TSOFuture<Long> getNewStartTimestamp() {
191         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
192         TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
193         builder.setTimestampRequest(tsreqBuilder.build());
194         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
195         fsm.sendEvent(request);
196         return new ForwardingTSOFuture<>(request);
197     }
198 
199     /**
200      * @see TSOProtocol#commit(long, Set)
201      */
202     @Override
203     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
204         return commit(transactionId, cells, new HashSet<CellId>());
205     }
206 
207     /**
208      * @see TSOProtocol#commit(long, Set, Set)
209      */
210     @Override
211     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells, Set<? extends CellId> conflictFreeWriteSet) {
212         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
213         TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
214         commitbuilder.setStartTimestamp(transactionId);
215         HashSet<Long> rowLevelWriteSet = new HashSet<Long>();
216         HashSet<Long> tableIDs = new HashSet<Long>();
217         rowLevelWriteSet.clear();
218         for (CellId cell : cells) {
219             long id;
220 
221             switch (conflictDetectionLevel) {
222             case ROW:
223                 id = cell.getRowId();
224                 if (rowLevelWriteSet.contains(id)) {
225                     continue;
226                 } else {
227                     rowLevelWriteSet.add(id);
228                 }
229                 break;
230             case CELL:
231                 id = cell.getCellId();
232                 break;
233             default:
234                 id = 0;
235                 assert (false);
236             }
237 
238             commitbuilder.addCellId(id);
239             tableIDs.add(cell.getTableId());
240         }
241 
242         for (CellId cell : conflictFreeWriteSet) {
243             tableIDs.add(cell.getTableId());
244         }
245 
246         commitbuilder.addAllTableId(tableIDs);
247         tableIDs.clear();
248         builder.setCommitRequest(commitbuilder.build());
249         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
250         fsm.sendEvent(request);
251         return new ForwardingTSOFuture<>(request);
252     }
253 
254     /**
255      * @see TSOProtocol#getFence(long)
256      */
257     @Override
258     public TSOFuture<Long> getFence(long tableId) {
259         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
260         TSOProto.FenceRequest.Builder fenceReqBuilder = TSOProto.FenceRequest.newBuilder();
261         fenceReqBuilder.setTableId(tableId);
262         builder.setFenceRequest(fenceReqBuilder.build());
263         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
264         fsm.sendEvent(request);
265         return new ForwardingTSOFuture<>(request);
266     }
267 
268     /**
269      * @see TSOProtocol#close()
270      */
271     @Override
272     public TSOFuture<Void> close() {
273         final CloseEvent closeEvent = new CloseEvent();
274         fsm.sendEvent(closeEvent);
275         closeEvent.addListener(new Runnable() {
276             @Override
277             public void run() {
278                 try {
279                     closeEvent.get();
280                 } catch (InterruptedException e) {
281                     Thread.currentThread().interrupt();
282                     e.printStackTrace();
283                 } catch (ExecutionException e) {
284                     e.printStackTrace();
285                 } finally {
286                     fsmExecutor.shutdown();
287                     if (currentTSOZNode != null) {
288                         try {
289                             currentTSOZNode.close();
290                         } catch (IOException e) {
291                             e.printStackTrace();
292                         }
293                     }
294                     if (zkClient != null) {
295                         zkClient.close();
296                     }
297                 }
298 
299             }
300         }, fsmExecutor);
301         return new ForwardingTSOFuture<>(closeEvent);
302     }
303 
304     // ----------------------------------------------------------------------------------------------------------------
305     // High availability related interface
306     // ----------------------------------------------------------------------------------------------------------------
307 
308     /**
309      * @see TSOProtocol#getEpoch()
310      */
311     @Override
312     public long getEpoch() {
313         return epoch;
314     }
315 
316     /**
317      * Used for family deletion
318      * @return the conflict detection level.
319      */
320     public ConflictDetectionLevel getConflictDetectionLevel() {
321         return conflictDetectionLevel;
322     }
323 
324     /**
325      * Used for family deletion testing
326      */
327     public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
328         this.conflictDetectionLevel = conflictDetectionLevel;
329     }
330 
331     // ----------------------------------------------------------------------------------------------------------------
332     // NodeCacheListener interface
333     // ----------------------------------------------------------------------------------------------------------------
334 
335     @Override
336     public void nodeChanged() throws Exception {
337 
338         String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
339         // TSO info includes the new TSO host:port address and epoch
340         String[] currentTSOAndEpochArray = tsoInfo.split("#");
341         HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
342         setTSOAddress(hp.getHost(), hp.getPort());
343         epoch = Long.parseLong(currentTSOAndEpochArray[1]);
344         LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
345         if (currentChannel != null && currentChannel.isConnected()) {
346             LOG.info("\tClosing channel with previous TSO {}", currentChannel);
347             currentChannel.close();
348         }
349 
350     }
351 
352     @Override
353     public boolean isLowLatency() {
354         return lowLatency;
355     }
356 
357     // ****************************************** Finite State Machine ************************************************
358 
359     // ----------------------------------------------------------------------------------------------------------------
360     // FSM: Events
361     // ----------------------------------------------------------------------------------------------------------------
362 
363     private static class ParamEvent<T> implements StateMachine.Event {
364 
365         final T param;
366 
367         ParamEvent(T param) {
368             this.param = param;
369         }
370 
371         T getParam() {
372             return param;
373         }
374     }
375 
376     private static class ErrorEvent extends ParamEvent<Throwable> {
377 
378         ErrorEvent(Throwable t) {
379             super(t);
380         }
381     }
382 
383     private static class ConnectedEvent extends ParamEvent<Channel> {
384 
385         ConnectedEvent(Channel c) {
386             super(c);
387         }
388     }
389 
390     private static class UserEvent<T> extends AbstractFuture<T>
391             implements StateMachine.DeferrableEvent {
392 
393         void success(T value) {
394             set(value);
395         }
396 
397         @Override
398         public void error(Throwable t) {
399             setException(t);
400         }
401     }
402 
403     private static class CloseEvent extends UserEvent<Void> {
404 
405     }
406 
407     private static class ChannelClosedEvent extends ParamEvent<Throwable> {
408 
409         ChannelClosedEvent(Throwable t) {
410             super(t);
411         }
412     }
413 
414     private static class ReconnectEvent implements StateMachine.Event {
415 
416     }
417 
418     private static class HandshakeTimeoutEvent implements StateMachine.Event {
419 
420     }
421 
422     private static class TimestampRequestTimeoutEvent implements StateMachine.Event {
423 
424     }
425 
426     private static class CommitRequestTimeoutEvent implements StateMachine.Event {
427 
428         final long startTimestamp;
429 
430         CommitRequestTimeoutEvent(long startTimestamp) {
431             this.startTimestamp = startTimestamp;
432         }
433 
434         public long getStartTimestamp() {
435             return startTimestamp;
436         }
437     }
438 
439     private static class FenceRequestTimeoutEvent implements StateMachine.Event {
440 
441         final long tableID;
442 
443         FenceRequestTimeoutEvent(long tableID) {
444             this.tableID = tableID;
445         }
446 
447         public long getTableID() {
448             return tableID;
449         }
450     }
451 
452     private static class RequestEvent extends UserEvent<Long> {
453 
454         TSOProto.Request req;
455         int retriesLeft;
456 
457         RequestEvent(TSOProto.Request req, int retriesLeft) {
458             this.req = req;
459             this.retriesLeft = retriesLeft;
460         }
461 
462         TSOProto.Request getRequest() {
463             return req;
464         }
465 
466         void setRequest(TSOProto.Request request) {
467             this.req = request;
468         }
469 
470         int getRetriesLeft() {
471             return retriesLeft;
472         }
473 
474         void decrementRetries() {
475             retriesLeft--;
476         }
477 
478     }
479 
480     private static class ResponseEvent extends ParamEvent<TSOProto.Response> {
481 
482         ResponseEvent(TSOProto.Response r) {
483             super(r);
484         }
485     }
486 
487     // ----------------------------------------------------------------------------------------------------------------
488     // FSM: States
489     // ----------------------------------------------------------------------------------------------------------------
490 
491     class BaseState extends StateMachine.State {
492 
493         BaseState(StateMachine.Fsm fsm) {
494             super(fsm);
495         }
496 
497         public StateMachine.State handleEvent(StateMachine.Event e) {
498             LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
499             return this;
500         }
501     }
502 
503     class DisconnectedState extends BaseState {
504 
505         DisconnectedState(StateMachine.Fsm fsm) {
506             super(fsm);
507             LOG.debug("NEW STATE: DISCONNECTED");
508         }
509 
510         public StateMachine.State handleEvent(RequestEvent e) {
511             fsm.deferEvent(e);
512             return tryToConnectToTSOServer();
513         }
514 
515         public StateMachine.State handleEvent(CloseEvent e) {
516             factory.releaseExternalResources();
517             e.success(null);
518             return this;
519         }
520 
521         private StateMachine.State tryToConnectToTSOServer() {
522             final InetSocketAddress tsoAddress = getAddress();
523             LOG.info("Trying to connect to TSO [{}]", tsoAddress);
524             ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
525             channelFuture.addListener(new ChannelFutureListener() {
526                 @Override
527                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
528                     if (channelFuture.isSuccess()) {
529                         LOG.info("Connection to TSO [{}] established. Channel {}",
530                                  tsoAddress, channelFuture.getChannel());
531                     } else {
532                         LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
533                                   tsoAddress, channelFuture.getChannel());
534                     }
535                 }
536             });
537             return new ConnectingState(fsm);
538         }
539     }
540 
541     private class ConnectingState extends BaseState {
542 
543         ConnectingState(StateMachine.Fsm fsm) {
544             super(fsm);
545             LOG.debug("NEW STATE: CONNECTING");
546         }
547 
548         public StateMachine.State handleEvent(UserEvent e) {
549             fsm.deferEvent(e);
550             return this;
551         }
552 
553         public StateMachine.State handleEvent(ConnectedEvent e) {
554             return new HandshakingState(fsm, e.getParam());
555         }
556 
557         public StateMachine.State handleEvent(ChannelClosedEvent e) {
558             return new ConnectionFailedState(fsm, e.getParam());
559         }
560 
561         public StateMachine.State handleEvent(ErrorEvent e) {
562             return new ConnectionFailedState(fsm, e.getParam());
563         }
564 
565     }
566 
567     private static class RequestAndTimeout {
568 
569         final RequestEvent event;
570         final Timeout timeout;
571 
572         RequestAndTimeout(RequestEvent event, Timeout timeout) {
573             this.event = event;
574             this.timeout = timeout;
575         }
576 
577         RequestEvent getRequest() {
578             return event;
579         }
580 
581         Timeout getTimeout() {
582             return timeout;
583         }
584 
585         public String toString() {
586             String info = "Request type ";
587             if (event.getRequest().hasTimestampRequest()) {
588                 info += "[Timestamp]";
589             } else if (event.getRequest().hasCommitRequest()) {
590                 info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
591             } else {
592                 info += "NONE";
593             }
594             return info;
595         }
596     }
597 
598     private class HandshakingState extends BaseState {
599 
600         final Channel channel;
601 
602         final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
603                 new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
604         final Timeout timeout;
605 
606         HandshakingState(StateMachine.Fsm fsm, Channel channel) {
607             super(fsm);
608             LOG.debug("NEW STATE: HANDSHAKING");
609             this.channel = channel;
610             TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
611             // Add the required handshake capabilities when necessary
612             handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
613             channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
614             timeout = newTimeout();
615         }
616 
617         private Timeout newTimeout() {
618             if (requestTimeoutInMs > 0) {
619                 return timeoutExecutor.newTimeout(new TimerTask() {
620                     @Override
621                     public void run(Timeout timeout) {
622                         fsm.sendEvent(new HandshakeTimeoutEvent());
623                     }
624                 }, 30, TimeUnit.SECONDS);
625             } else {
626                 return null;
627             }
628         }
629 
630         public StateMachine.State handleEvent(UserEvent e) {
631             fsm.deferEvent(e);
632             return this;
633         }
634 
635         public StateMachine.State handleEvent(ResponseEvent e) {
636             lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
637             if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
638                 if (timeout != null) {
639                     timeout.cancel();
640                 }
641                 return new ConnectedState(fsm, channel, timeoutExecutor);
642             } else {
643                 cleanupState();
644                 LOG.error("Client incompatible with server");
645                 return new HandshakeFailedState(fsm, new HandshakeFailedException());
646             }
647         }
648 
649         public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
650             cleanupState();
651             return new ClosingState(fsm);
652         }
653 
654         public StateMachine.State handleEvent(ErrorEvent e) {
655             cleanupState();
656             Throwable exception = e.getParam();
657             LOG.error("Error during handshake", exception);
658             return new HandshakeFailedState(fsm, exception);
659         }
660 
661         private void cleanupState() {
662             timeoutExecutor.stop();
663             channel.close();
664             if (timeout != null) {
665                 timeout.cancel();
666             }
667         }
668 
669     }
670 
671     class ConnectionFailedState extends BaseState {
672 
673         final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
674                 new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
675 
676         Throwable exception;
677 
678         ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
679             super(fsm);
680             LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
681             this.exception = exception;
682             reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
683                 @Override
684                 public void run(Timeout timeout) {
685                     fsm.sendEvent(new ReconnectEvent());
686                 }
687             }, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
688         }
689 
690         public StateMachine.State handleEvent(UserEvent e) {
691             e.error(exception);
692             return this;
693         }
694 
695         public StateMachine.State handleEvent(ErrorEvent e) {
696             return this;
697         }
698 
699         public StateMachine.State handleEvent(ChannelClosedEvent e) {
700             return new DisconnectedState(fsm);
701         }
702 
703         public StateMachine.State handleEvent(ReconnectEvent e) {
704             return new DisconnectedState(fsm);
705         }
706 
707     }
708 
709     private class HandshakeFailedState extends ConnectionFailedState {
710 
711         HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
712             super(fsm, exception);
713             LOG.debug("STATE: HANDSHAKING FAILED");
714         }
715 
716     }
717 
718     class ConnectedState extends BaseState {
719 
720         final Queue<RequestAndTimeout> timestampRequests;
721         final Map<Long, RequestAndTimeout> commitRequests;
722         final Map<Long, RequestAndTimeout> fenceRequests;
723         final Channel channel;
724 
725         final HashedWheelTimer timeoutExecutor;
726 
727         ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
728             super(fsm);
729             LOG.debug("NEW STATE: CONNECTED");
730             this.channel = channel;
731             this.timeoutExecutor = timeoutExecutor;
732             timestampRequests = new ArrayDeque<>();
733             commitRequests = new HashMap<>();
734             fenceRequests = new HashMap<>();
735         }
736 
737         private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
738             if (requestTimeoutInMs > 0) {
739                 return timeoutExecutor.newTimeout(new TimerTask() {
740                     @Override
741                     public void run(Timeout timeout) {
742                         fsm.sendEvent(timeoutEvent);
743                     }
744                 }, requestTimeoutInMs, TimeUnit.MILLISECONDS);
745             } else {
746                 return null;
747             }
748         }
749 
750         private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
751             TSOProto.Request req = request.getRequest();
752 
753             if (req.hasTimestampRequest()) {
754                 timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
755             } else if (req.hasCommitRequest()) {
756                 TSOProto.CommitRequest commitReq = req.getCommitRequest();
757                 commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
758                         request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
759             } else if (req.hasFenceRequest()) {
760                 TSOProto.FenceRequest fenceReq = req.getFenceRequest();
761                 fenceRequests.put(fenceReq.getTableId(), new RequestAndTimeout(
762                         request, newTimeout(new FenceRequestTimeoutEvent(fenceReq.getTableId()))));
763             } else {
764                 request.error(new IllegalArgumentException("Unknown request type"));
765                 return;
766             }
767             ChannelFuture f = channel.write(req);
768 
769             f.addListener(new ChannelFutureListener() {
770                 @Override
771                 public void operationComplete(ChannelFuture future) {
772                     if (!future.isSuccess()) {
773                         fsm.sendEvent(new ErrorEvent(future.getCause()));
774                     }
775                 }
776             });
777         }
778 
779         private void handleResponse(ResponseEvent response) {
780             TSOProto.Response resp = response.getParam();
781             if (resp.hasTimestampResponse()) {
782                 if (timestampRequests.size() == 0) {
783                     LOG.debug("Received timestamp response when no requests outstanding");
784                     return;
785                 }
786                 RequestAndTimeout e = timestampRequests.remove();
787                 e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
788                 if (e.getTimeout() != null) {
789                     e.getTimeout().cancel();
790                 }
791             } else if (resp.hasCommitResponse()) {
792                 long startTimestamp = resp.getCommitResponse().getStartTimestamp();
793                 RequestAndTimeout e = commitRequests.remove(startTimestamp);
794                 if (e == null) {
795                     LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
796                     return;
797                 }
798                 if (e.getTimeout() != null) {
799                     e.getTimeout().cancel();
800                 }
801                 if (resp.getCommitResponse().getAborted()) {
802                     e.getRequest().error(new AbortException());
803                 } else {
804                     e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
805                 }
806             } else if (resp.hasFenceResponse()) {
807                 long tableID = resp.getFenceResponse().getTableId();
808                 RequestAndTimeout e = fenceRequests.remove(tableID);
809                 if (e == null) {
810                     LOG.debug("Received fence response for request that doesn't exist. Table ID: {}", tableID);
811                     return;
812                 }
813                 if (e.getTimeout() != null) {
814                     e.getTimeout().cancel();
815                 }
816 
817                 e.getRequest().success(resp.getFenceResponse().getFenceId());
818             }
819         }
820 
821         public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
822             if (!timestampRequests.isEmpty()) {
823                 RequestAndTimeout r = timestampRequests.remove();
824                 if (r.getTimeout() != null) {
825                     r.getTimeout().cancel();
826                 }
827                 queueRetryOrError(fsm, r.getRequest());
828             }
829             return this;
830         }
831 
832         public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
833             long startTimestamp = e.getStartTimestamp();
834             if (commitRequests.containsKey(startTimestamp)) {
835                 RequestAndTimeout r = commitRequests.remove(startTimestamp);
836                 if (r.getTimeout() != null) {
837                     r.getTimeout().cancel();
838                 }
839                 queueRetryOrError(fsm, r.getRequest());
840             }
841             return this;
842         }
843 
844         public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
845             long tableID = e.getTableID();
846             if (fenceRequests.containsKey(tableID)) {
847                 RequestAndTimeout r = fenceRequests.remove(tableID);
848                 if (r.getTimeout() != null) {
849                     r.getTimeout().cancel();
850                 }
851                 queueRetryOrError(fsm, r.getRequest());
852             }
853             return this;
854         }
855 
856         public StateMachine.State handleEvent(CloseEvent e) {
857             LOG.debug("CONNECTED STATE: CloseEvent");
858             timeoutExecutor.stop();
859             closeChannelAndErrorRequests();
860             fsm.deferEvent(e);
861             return new ClosingState(fsm);
862         }
863 
864         public StateMachine.State handleEvent(RequestEvent e) {
865             sendRequest(fsm, e);
866             return this;
867         }
868 
869         public StateMachine.State handleEvent(ResponseEvent e) {
870             handleResponse(e);
871             return this;
872         }
873 
874         public StateMachine.State handleEvent(ErrorEvent e) {
875             LOG.debug("CONNECTED STATE: ErrorEvent");
876             timeoutExecutor.stop();
877             handleError(fsm);
878             return new ClosingState(fsm);
879         }
880 
881         private void handleError(StateMachine.Fsm fsm) {
882             LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
883             while (timestampRequests.size() > 0) {
884                 RequestAndTimeout r = timestampRequests.remove();
885                 if (r.getTimeout() != null) {
886                     r.getTimeout().cancel();
887                 }
888                 queueRetryOrError(fsm, r.getRequest());
889             }
890             Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
891             while (iter.hasNext()) {
892                 RequestAndTimeout r = iter.next().getValue();
893                 if (r.getTimeout() != null) {
894                     r.getTimeout().cancel();
895                 }
896                 queueRetryOrError(fsm, r.getRequest());
897                 iter.remove();
898             }
899             iter = fenceRequests.entrySet().iterator();
900             while (iter.hasNext()) {
901                 RequestAndTimeout r = iter.next().getValue();
902                 if (r.getTimeout() != null) {
903                     r.getTimeout().cancel();
904                 }
905                 queueRetryOrError(fsm, r.getRequest());
906                 iter.remove();
907             }
908             channel.close();
909         }
910 
911         private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
912             if (e.getRetriesLeft() > 0) {
913                 e.decrementRetries();
914                 if (e.getRequest().hasCommitRequest()) {
915                     TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
916                     if (!commitRequest.getIsRetry()) { // Create a new retry for the commit request
917                         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
918                         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
919                         commitBuilder.mergeFrom(commitRequest);
920                         commitBuilder.setIsRetry(true);
921                         builder.setCommitRequest(commitBuilder.build());
922                         e.setRequest(builder.build());
923                     }
924                 }
925                 fsm.sendEvent(e);
926             } else {
927                 e.error(
928                         new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
929             }
930         }
931 
932         private void closeChannelAndErrorRequests() {
933             channel.close();
934             for (RequestAndTimeout r : timestampRequests) {
935                 if (r.getTimeout() != null) {
936                     r.getTimeout().cancel();
937                 }
938                 r.getRequest().error(new ClosingException());
939             }
940             for (RequestAndTimeout r : commitRequests.values()) {
941                 if (r.getTimeout() != null) {
942                     r.getTimeout().cancel();
943                 }
944                 r.getRequest().error(new ClosingException());
945             }
946             for (RequestAndTimeout r : fenceRequests.values()) {
947                 if (r.getTimeout() != null) {
948                     r.getTimeout().cancel();
949                 }
950                 r.getRequest().error(new ClosingException());
951             }
952         }
953     }
954 
955     private class ClosingState extends BaseState {
956 
957         ClosingState(StateMachine.Fsm fsm) {
958             super(fsm);
959             LOG.debug("NEW STATE: CLOSING");
960         }
961 
962         public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
963             // Ignored. They will be retried or errored
964             return this;
965         }
966 
967         public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
968             // Ignored. They will be retried or errored
969             return this;
970         }
971 
972         public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
973             // Ignored. They will be retried or errored
974             return this;
975         }
976 
977         public StateMachine.State handleEvent(ErrorEvent e) {
978             // Ignored. They will be retried or errored
979             return this;
980         }
981 
982         public StateMachine.State handleEvent(ResponseEvent e) {
983             // Ignored. They will be retried or errored
984             return this;
985         }
986 
987         public StateMachine.State handleEvent(UserEvent e) {
988             fsm.deferEvent(e);
989             return this;
990         }
991 
992         public StateMachine.State handleEvent(ChannelClosedEvent e) {
993             return new DisconnectedState(fsm);
994         }
995 
996         public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
997             return this;
998         }
999 
1000     }
1001 
1002     // ----------------------------------------------------------------------------------------------------------------
1003     // Helper classes & methods
1004     // ----------------------------------------------------------------------------------------------------------------
1005 
1006     private class Handler extends SimpleChannelHandler {
1007 
1008         private StateMachine.Fsm fsm;
1009 
1010         Handler(StateMachine.Fsm fsm) {
1011             this.fsm = fsm;
1012         }
1013 
1014         @Override
1015         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
1016             currentChannel = e.getChannel();
1017             LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
1018             fsm.sendEvent(new ConnectedEvent(e.getChannel()));
1019         }
1020 
1021         @Override
1022         public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1023             LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
1024             fsm.sendEvent(new ErrorEvent(new ConnectionException()));
1025         }
1026 
1027         @Override
1028         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1029             LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
1030             fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
1031         }
1032 
1033         @Override
1034         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
1035             if (e.getMessage() instanceof TSOProto.Response) {
1036                 fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
1037             } else {
1038                 LOG.warn("Received unknown message", e.getMessage());
1039             }
1040         }
1041 
1042         @Override
1043         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
1044             LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
1045             fsm.sendEvent(new ErrorEvent(e.getCause()));
1046         }
1047     }
1048 
1049     private synchronized void setTSOAddress(String host, int port) {
1050         tsoAddr = new InetSocketAddress(host, port);
1051     }
1052 
1053     private synchronized InetSocketAddress getAddress() {
1054         return tsoAddr;
1055     }
1056 
1057     private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
1058         try {
1059             currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
1060             currentTSOZNode.getListenable().addListener(this);
1061             currentTSOZNode.start(true);
1062         } catch (Exception e) {
1063             throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
1064         }
1065     }
1066 
1067     private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
1068         ChildData currentTSOData = currentTSOZNode.getCurrentData();
1069         if (currentTSOData == null) {
1070             throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
1071         }
1072         byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
1073         if (currentTSOAndEpochAsBytes == null) {
1074             throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
1075         }
1076         return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
1077     }
1078 
1079 }