View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.base.Charsets;
21  import com.google.common.net.HostAndPort;
22  import com.google.common.util.concurrent.AbstractFuture;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import org.apache.omid.proto.TSOProto;
26  import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
27  import org.apache.omid.zk.ZKUtils;
28  import org.apache.statemachine.StateMachine;
29  import org.apache.curator.framework.CuratorFramework;
30  import org.apache.curator.framework.recipes.cache.ChildData;
31  import org.apache.curator.framework.recipes.cache.NodeCache;
32  import org.apache.curator.framework.recipes.cache.NodeCacheListener;
33  import org.jboss.netty.bootstrap.ClientBootstrap;
34  import org.jboss.netty.channel.Channel;
35  import org.jboss.netty.channel.ChannelFactory;
36  import org.jboss.netty.channel.ChannelFuture;
37  import org.jboss.netty.channel.ChannelFutureListener;
38  import org.jboss.netty.channel.ChannelHandlerContext;
39  import org.jboss.netty.channel.ChannelPipeline;
40  import org.jboss.netty.channel.ChannelStateEvent;
41  import org.jboss.netty.channel.ExceptionEvent;
42  import org.jboss.netty.channel.MessageEvent;
43  import org.jboss.netty.channel.SimpleChannelHandler;
44  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
45  import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
46  import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
47  import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
48  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
49  import org.jboss.netty.util.HashedWheelTimer;
50  import org.jboss.netty.util.Timeout;
51  import org.jboss.netty.util.TimerTask;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  import java.io.IOException;
56  import java.net.InetSocketAddress;
57  import java.util.ArrayDeque;
58  import java.util.HashMap;
59  import java.util.HashSet;
60  import java.util.Iterator;
61  import java.util.Map;
62  import java.util.Queue;
63  import java.util.Set;
64  import java.util.concurrent.ExecutionException;
65  import java.util.concurrent.Executors;
66  import java.util.concurrent.ScheduledExecutorService;
67  import java.util.concurrent.TimeUnit;
68  
69  
70  /**
71   * Describes the abstract methods to communicate to the TSO server
72   */
73  public class TSOClient implements TSOProtocol, NodeCacheListener {
74  
75      private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
76  
77      // Basic configuration constants & defaults TODO: Move DEFAULT_ZK_CLUSTER to a conf class???
78      public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
79  
80      private static final long DEFAULT_EPOCH = -1L;
81      private volatile long epoch = DEFAULT_EPOCH;
82  
83      // Attributes
84      private CuratorFramework zkClient;
85      private NodeCache currentTSOZNode;
86  
87      private ChannelFactory factory;
88      private ClientBootstrap bootstrap;
89      private Channel currentChannel;
90      private final ScheduledExecutorService fsmExecutor;
91      StateMachine.Fsm fsm;
92  
93      private final int requestTimeoutInMs;
94      private final int requestMaxRetries;
95      private final int tsoReconnectionDelayInSecs;
96      private InetSocketAddress tsoAddr;
97      private String zkCurrentTsoPath;
98  
99      private boolean lowLatency;
100 
101     // Use to extract unique table identifiers from the modified cells list.
102     private final Set<Long> tableIDs;
103     // Conflict detection level of the entire system. Can either be Row or Cell level.
104     private ConflictDetectionLevel conflictDetectionLevel;
105     private Set<Long> rowLevelWriteSet;
106 
107     // ----------------------------------------------------------------------------------------------------------------
108     // Construction
109     // ----------------------------------------------------------------------------------------------------------------
110 
111     public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
112         return new TSOClient(tsoClientConf);
113     }
114 
115     // Avoid instantiation
116     private TSOClient(OmidClientConfiguration omidConf) throws IOException {
117 
118         // Start client with Nb of active threads = 3 as maximum.
119         int tsoExecutorThreads = omidConf.getExecutorThreads();
120 
121         factory = new NioClientSocketChannelFactory(
122                 Executors.newCachedThreadPool(
123                         new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
124                 Executors.newCachedThreadPool(
125                         new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
126         // Create the bootstrap
127         bootstrap = new ClientBootstrap(factory);
128 
129         requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
130         requestMaxRetries = omidConf.getRequestMaxRetries();
131         tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
132 
133         LOG.info("Connecting to TSO...");
134         HostAndPort hp;
135         switch (omidConf.getConnectionType()) {
136             case HA:
137                 zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
138                                                 omidConf.getZkNamespace(),
139                                                 omidConf.getZkConnectionTimeoutInSecs());
140                 zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
141                 configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
142                 String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
143                 // TSO info includes the new TSO host:port address and epoch
144                 String[] currentTSOAndEpochArray = tsoInfo.split("#");
145                 hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
146                 setTSOAddress(hp.getHostText(), hp.getPort());
147                 epoch = Long.parseLong(currentTSOAndEpochArray[1]);
148                 LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
149                 break;
150             case DIRECT:
151             default:
152                 hp = HostAndPort.fromString(omidConf.getConnectionString());
153                 setTSOAddress(hp.getHostText(), hp.getPort());
154                 LOG.info("\t* TSO host:port {} will be connected directly", hp);
155                 break;
156         }
157 
158         fsmExecutor = Executors.newSingleThreadScheduledExecutor(
159                 new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
160         fsm = new StateMachine.FsmImpl(fsmExecutor);
161         fsm.setInitState(new DisconnectedState(fsm));
162 
163         ChannelPipeline pipeline = bootstrap.getPipeline();
164         pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
165         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
166         pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
167         pipeline.addLast("protobufencoder", new ProtobufEncoder());
168         pipeline.addLast("handler", new Handler(fsm));
169 
170         bootstrap.setOption("tcpNoDelay", true);
171         bootstrap.setOption("keepAlive", true);
172         bootstrap.setOption("reuseAddress", true);
173         bootstrap.setOption("connectTimeoutMillis", 100);
174         lowLatency = false;
175 
176         this.tableIDs = new HashSet<Long>();
177 
178         conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
179         rowLevelWriteSet = new HashSet<Long>();
180     }
181 
182     // ----------------------------------------------------------------------------------------------------------------
183     // TSOProtocol interface
184     // ----------------------------------------------------------------------------------------------------------------
185 
186     /**
187      * @see TSOProtocol#getNewStartTimestamp()
188      */
189     @Override
190     public TSOFuture<Long> getNewStartTimestamp() {
191         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
192         TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
193         builder.setTimestampRequest(tsreqBuilder.build());
194         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
195         fsm.sendEvent(request);
196         return new ForwardingTSOFuture<>(request);
197     }
198 
199     /**
200      * @see TSOProtocol#commit(long, Set)
201      */
202     @Override
203     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
204         return commit(transactionId, cells, new HashSet<CellId>());
205     }
206 
207     /**
208      * @see TSOProtocol#commit(long, Set, Set)
209      */
210     @Override
211     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells, Set<? extends CellId> conflictFreeWriteSet) {
212         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
213         TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
214         commitbuilder.setStartTimestamp(transactionId);
215 
216         rowLevelWriteSet.clear();
217         for (CellId cell : cells) {
218             long id;
219 
220             switch (conflictDetectionLevel) {
221             case ROW:
222                 id = cell.getRowId();
223                 if (rowLevelWriteSet.contains(id)) {
224                     continue;
225                 } else {
226                     rowLevelWriteSet.add(id);
227                 }
228                 break;
229             case CELL:
230                 id = cell.getCellId();
231                 break;
232             default:
233                 id = 0;
234                 assert (false);
235             }
236 
237             commitbuilder.addCellId(id);
238             tableIDs.add(cell.getTableId());
239         }
240 
241         for (CellId cell : conflictFreeWriteSet) {
242             tableIDs.add(cell.getTableId());
243         }
244 
245         commitbuilder.addAllTableId(tableIDs);
246         tableIDs.clear();
247         builder.setCommitRequest(commitbuilder.build());
248         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
249         fsm.sendEvent(request);
250         return new ForwardingTSOFuture<>(request);
251     }
252 
253     /**
254      * @see TSOProtocol#getFence(long)
255      */
256     @Override
257     public TSOFuture<Long> getFence(long tableId) {
258         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
259         TSOProto.FenceRequest.Builder fenceReqBuilder = TSOProto.FenceRequest.newBuilder();
260         fenceReqBuilder.setTableId(tableId);
261         builder.setFenceRequest(fenceReqBuilder.build());
262         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
263         fsm.sendEvent(request);
264         return new ForwardingTSOFuture<>(request);
265     }
266 
267     /**
268      * @see TSOProtocol#close()
269      */
270     @Override
271     public TSOFuture<Void> close() {
272         final CloseEvent closeEvent = new CloseEvent();
273         fsm.sendEvent(closeEvent);
274         closeEvent.addListener(new Runnable() {
275             @Override
276             public void run() {
277                 try {
278                     closeEvent.get();
279                 } catch (InterruptedException e) {
280                     Thread.currentThread().interrupt();
281                     e.printStackTrace();
282                 } catch (ExecutionException e) {
283                     e.printStackTrace();
284                 } finally {
285                     fsmExecutor.shutdown();
286                     if (currentTSOZNode != null) {
287                         try {
288                             currentTSOZNode.close();
289                         } catch (IOException e) {
290                             e.printStackTrace();
291                         }
292                     }
293                     if (zkClient != null) {
294                         zkClient.close();
295                     }
296                 }
297 
298             }
299         }, fsmExecutor);
300         return new ForwardingTSOFuture<>(closeEvent);
301     }
302 
303     // ----------------------------------------------------------------------------------------------------------------
304     // High availability related interface
305     // ----------------------------------------------------------------------------------------------------------------
306 
307     /**
308      * @see TSOProtocol#getEpoch()
309      */
310     @Override
311     public long getEpoch() {
312         return epoch;
313     }
314 
315     /**
316      * Used for family deletion
317      * @return the conflict detection level.
318      */
319     public ConflictDetectionLevel getConflictDetectionLevel() {
320         return conflictDetectionLevel;
321     }
322 
323     /**
324      * Used for family deletion testing
325      */
326     public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
327         this.conflictDetectionLevel = conflictDetectionLevel;
328     }
329 
330     // ----------------------------------------------------------------------------------------------------------------
331     // NodeCacheListener interface
332     // ----------------------------------------------------------------------------------------------------------------
333 
334     @Override
335     public void nodeChanged() throws Exception {
336 
337         String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
338         // TSO info includes the new TSO host:port address and epoch
339         String[] currentTSOAndEpochArray = tsoInfo.split("#");
340         HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
341         setTSOAddress(hp.getHostText(), hp.getPort());
342         epoch = Long.parseLong(currentTSOAndEpochArray[1]);
343         LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
344         if (currentChannel != null && currentChannel.isConnected()) {
345             LOG.info("\tClosing channel with previous TSO {}", currentChannel);
346             currentChannel.close();
347         }
348 
349     }
350 
351     @Override
352     public boolean isLowLatency() {
353         return lowLatency;
354     }
355 
356     // ****************************************** Finite State Machine ************************************************
357 
358     // ----------------------------------------------------------------------------------------------------------------
359     // FSM: Events
360     // ----------------------------------------------------------------------------------------------------------------
361 
362     private static class ParamEvent<T> implements StateMachine.Event {
363 
364         final T param;
365 
366         ParamEvent(T param) {
367             this.param = param;
368         }
369 
370         T getParam() {
371             return param;
372         }
373     }
374 
375     private static class ErrorEvent extends ParamEvent<Throwable> {
376 
377         ErrorEvent(Throwable t) {
378             super(t);
379         }
380     }
381 
382     private static class ConnectedEvent extends ParamEvent<Channel> {
383 
384         ConnectedEvent(Channel c) {
385             super(c);
386         }
387     }
388 
389     private static class UserEvent<T> extends AbstractFuture<T>
390             implements StateMachine.DeferrableEvent {
391 
392         void success(T value) {
393             set(value);
394         }
395 
396         @Override
397         public void error(Throwable t) {
398             setException(t);
399         }
400     }
401 
402     private static class CloseEvent extends UserEvent<Void> {
403 
404     }
405 
406     private static class ChannelClosedEvent extends ParamEvent<Throwable> {
407 
408         ChannelClosedEvent(Throwable t) {
409             super(t);
410         }
411     }
412 
413     private static class ReconnectEvent implements StateMachine.Event {
414 
415     }
416 
417     private static class HandshakeTimeoutEvent implements StateMachine.Event {
418 
419     }
420 
421     private static class TimestampRequestTimeoutEvent implements StateMachine.Event {
422 
423     }
424 
425     private static class CommitRequestTimeoutEvent implements StateMachine.Event {
426 
427         final long startTimestamp;
428 
429         CommitRequestTimeoutEvent(long startTimestamp) {
430             this.startTimestamp = startTimestamp;
431         }
432 
433         public long getStartTimestamp() {
434             return startTimestamp;
435         }
436     }
437 
438     private static class FenceRequestTimeoutEvent implements StateMachine.Event {
439 
440         final long tableID;
441 
442         FenceRequestTimeoutEvent(long tableID) {
443             this.tableID = tableID;
444         }
445 
446         public long getTableID() {
447             return tableID;
448         }
449     }
450 
451     private static class RequestEvent extends UserEvent<Long> {
452 
453         TSOProto.Request req;
454         int retriesLeft;
455 
456         RequestEvent(TSOProto.Request req, int retriesLeft) {
457             this.req = req;
458             this.retriesLeft = retriesLeft;
459         }
460 
461         TSOProto.Request getRequest() {
462             return req;
463         }
464 
465         void setRequest(TSOProto.Request request) {
466             this.req = request;
467         }
468 
469         int getRetriesLeft() {
470             return retriesLeft;
471         }
472 
473         void decrementRetries() {
474             retriesLeft--;
475         }
476 
477     }
478 
479     private static class ResponseEvent extends ParamEvent<TSOProto.Response> {
480 
481         ResponseEvent(TSOProto.Response r) {
482             super(r);
483         }
484     }
485 
486     // ----------------------------------------------------------------------------------------------------------------
487     // FSM: States
488     // ----------------------------------------------------------------------------------------------------------------
489 
490     class BaseState extends StateMachine.State {
491 
492         BaseState(StateMachine.Fsm fsm) {
493             super(fsm);
494         }
495 
496         public StateMachine.State handleEvent(StateMachine.Event e) {
497             LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
498             return this;
499         }
500     }
501 
502     class DisconnectedState extends BaseState {
503 
504         DisconnectedState(StateMachine.Fsm fsm) {
505             super(fsm);
506             LOG.debug("NEW STATE: DISCONNECTED");
507         }
508 
509         public StateMachine.State handleEvent(RequestEvent e) {
510             fsm.deferEvent(e);
511             return tryToConnectToTSOServer();
512         }
513 
514         public StateMachine.State handleEvent(CloseEvent e) {
515             factory.releaseExternalResources();
516             e.success(null);
517             return this;
518         }
519 
520         private StateMachine.State tryToConnectToTSOServer() {
521             final InetSocketAddress tsoAddress = getAddress();
522             LOG.info("Trying to connect to TSO [{}]", tsoAddress);
523             ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
524             channelFuture.addListener(new ChannelFutureListener() {
525                 @Override
526                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
527                     if (channelFuture.isSuccess()) {
528                         LOG.info("Connection to TSO [{}] established. Channel {}",
529                                  tsoAddress, channelFuture.getChannel());
530                     } else {
531                         LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
532                                   tsoAddress, channelFuture.getChannel());
533                     }
534                 }
535             });
536             return new ConnectingState(fsm);
537         }
538     }
539 
540     private class ConnectingState extends BaseState {
541 
542         ConnectingState(StateMachine.Fsm fsm) {
543             super(fsm);
544             LOG.debug("NEW STATE: CONNECTING");
545         }
546 
547         public StateMachine.State handleEvent(UserEvent e) {
548             fsm.deferEvent(e);
549             return this;
550         }
551 
552         public StateMachine.State handleEvent(ConnectedEvent e) {
553             return new HandshakingState(fsm, e.getParam());
554         }
555 
556         public StateMachine.State handleEvent(ChannelClosedEvent e) {
557             return new ConnectionFailedState(fsm, e.getParam());
558         }
559 
560         public StateMachine.State handleEvent(ErrorEvent e) {
561             return new ConnectionFailedState(fsm, e.getParam());
562         }
563 
564     }
565 
566     private static class RequestAndTimeout {
567 
568         final RequestEvent event;
569         final Timeout timeout;
570 
571         RequestAndTimeout(RequestEvent event, Timeout timeout) {
572             this.event = event;
573             this.timeout = timeout;
574         }
575 
576         RequestEvent getRequest() {
577             return event;
578         }
579 
580         Timeout getTimeout() {
581             return timeout;
582         }
583 
584         public String toString() {
585             String info = "Request type ";
586             if (event.getRequest().hasTimestampRequest()) {
587                 info += "[Timestamp]";
588             } else if (event.getRequest().hasCommitRequest()) {
589                 info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
590             } else {
591                 info += "NONE";
592             }
593             return info;
594         }
595     }
596 
597     private class HandshakingState extends BaseState {
598 
599         final Channel channel;
600 
601         final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
602                 new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
603         final Timeout timeout;
604 
605         HandshakingState(StateMachine.Fsm fsm, Channel channel) {
606             super(fsm);
607             LOG.debug("NEW STATE: HANDSHAKING");
608             this.channel = channel;
609             TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
610             // Add the required handshake capabilities when necessary
611             handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
612             channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
613             timeout = newTimeout();
614         }
615 
616         private Timeout newTimeout() {
617             if (requestTimeoutInMs > 0) {
618                 return timeoutExecutor.newTimeout(new TimerTask() {
619                     @Override
620                     public void run(Timeout timeout) {
621                         fsm.sendEvent(new HandshakeTimeoutEvent());
622                     }
623                 }, 30, TimeUnit.SECONDS);
624             } else {
625                 return null;
626             }
627         }
628 
629         public StateMachine.State handleEvent(UserEvent e) {
630             fsm.deferEvent(e);
631             return this;
632         }
633 
634         public StateMachine.State handleEvent(ResponseEvent e) {
635             lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
636             if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
637                 if (timeout != null) {
638                     timeout.cancel();
639                 }
640                 return new ConnectedState(fsm, channel, timeoutExecutor);
641             } else {
642                 cleanupState();
643                 LOG.error("Client incompatible with server");
644                 return new HandshakeFailedState(fsm, new HandshakeFailedException());
645             }
646         }
647 
648         public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
649             cleanupState();
650             return new ClosingState(fsm);
651         }
652 
653         public StateMachine.State handleEvent(ErrorEvent e) {
654             cleanupState();
655             Throwable exception = e.getParam();
656             LOG.error("Error during handshake", exception);
657             return new HandshakeFailedState(fsm, exception);
658         }
659 
660         private void cleanupState() {
661             timeoutExecutor.stop();
662             channel.close();
663             if (timeout != null) {
664                 timeout.cancel();
665             }
666         }
667 
668     }
669 
670     class ConnectionFailedState extends BaseState {
671 
672         final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
673                 new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
674 
675         Throwable exception;
676 
677         ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
678             super(fsm);
679             LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
680             this.exception = exception;
681             reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
682                 @Override
683                 public void run(Timeout timeout) {
684                     fsm.sendEvent(new ReconnectEvent());
685                 }
686             }, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
687         }
688 
689         public StateMachine.State handleEvent(UserEvent e) {
690             e.error(exception);
691             return this;
692         }
693 
694         public StateMachine.State handleEvent(ErrorEvent e) {
695             return this;
696         }
697 
698         public StateMachine.State handleEvent(ChannelClosedEvent e) {
699             return new DisconnectedState(fsm);
700         }
701 
702         public StateMachine.State handleEvent(ReconnectEvent e) {
703             return new DisconnectedState(fsm);
704         }
705 
706     }
707 
708     private class HandshakeFailedState extends ConnectionFailedState {
709 
710         HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
711             super(fsm, exception);
712             LOG.debug("STATE: HANDSHAKING FAILED");
713         }
714 
715     }
716 
717     class ConnectedState extends BaseState {
718 
719         final Queue<RequestAndTimeout> timestampRequests;
720         final Map<Long, RequestAndTimeout> commitRequests;
721         final Map<Long, RequestAndTimeout> fenceRequests;
722         final Channel channel;
723 
724         final HashedWheelTimer timeoutExecutor;
725 
726         ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
727             super(fsm);
728             LOG.debug("NEW STATE: CONNECTED");
729             this.channel = channel;
730             this.timeoutExecutor = timeoutExecutor;
731             timestampRequests = new ArrayDeque<>();
732             commitRequests = new HashMap<>();
733             fenceRequests = new HashMap<>();
734         }
735 
736         private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
737             if (requestTimeoutInMs > 0) {
738                 return timeoutExecutor.newTimeout(new TimerTask() {
739                     @Override
740                     public void run(Timeout timeout) {
741                         fsm.sendEvent(timeoutEvent);
742                     }
743                 }, requestTimeoutInMs, TimeUnit.MILLISECONDS);
744             } else {
745                 return null;
746             }
747         }
748 
749         private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
750             TSOProto.Request req = request.getRequest();
751 
752             if (req.hasTimestampRequest()) {
753                 timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
754             } else if (req.hasCommitRequest()) {
755                 TSOProto.CommitRequest commitReq = req.getCommitRequest();
756                 commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
757                         request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
758             } else if (req.hasFenceRequest()) {
759                 TSOProto.FenceRequest fenceReq = req.getFenceRequest();
760                 fenceRequests.put(fenceReq.getTableId(), new RequestAndTimeout(
761                         request, newTimeout(new FenceRequestTimeoutEvent(fenceReq.getTableId()))));
762             } else {
763                 request.error(new IllegalArgumentException("Unknown request type"));
764                 return;
765             }
766             ChannelFuture f = channel.write(req);
767 
768             f.addListener(new ChannelFutureListener() {
769                 @Override
770                 public void operationComplete(ChannelFuture future) {
771                     if (!future.isSuccess()) {
772                         fsm.sendEvent(new ErrorEvent(future.getCause()));
773                     }
774                 }
775             });
776         }
777 
778         private void handleResponse(ResponseEvent response) {
779             TSOProto.Response resp = response.getParam();
780             if (resp.hasTimestampResponse()) {
781                 if (timestampRequests.size() == 0) {
782                     LOG.debug("Received timestamp response when no requests outstanding");
783                     return;
784                 }
785                 RequestAndTimeout e = timestampRequests.remove();
786                 e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
787                 if (e.getTimeout() != null) {
788                     e.getTimeout().cancel();
789                 }
790             } else if (resp.hasCommitResponse()) {
791                 long startTimestamp = resp.getCommitResponse().getStartTimestamp();
792                 RequestAndTimeout e = commitRequests.remove(startTimestamp);
793                 if (e == null) {
794                     LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
795                     return;
796                 }
797                 if (e.getTimeout() != null) {
798                     e.getTimeout().cancel();
799                 }
800                 if (resp.getCommitResponse().getAborted()) {
801                     e.getRequest().error(new AbortException());
802                 } else {
803                     e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
804                 }
805             } else if (resp.hasFenceResponse()) {
806                 long tableID = resp.getFenceResponse().getTableId();
807                 RequestAndTimeout e = fenceRequests.remove(tableID);
808                 if (e == null) {
809                     LOG.debug("Received fence response for request that doesn't exist. Table ID: {}", tableID);
810                     return;
811                 }
812                 if (e.getTimeout() != null) {
813                     e.getTimeout().cancel();
814                 }
815 
816                 e.getRequest().success(resp.getFenceResponse().getFenceId());
817             }
818         }
819 
820         public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
821             if (!timestampRequests.isEmpty()) {
822                 RequestAndTimeout r = timestampRequests.remove();
823                 if (r.getTimeout() != null) {
824                     r.getTimeout().cancel();
825                 }
826                 queueRetryOrError(fsm, r.getRequest());
827             }
828             return this;
829         }
830 
831         public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
832             long startTimestamp = e.getStartTimestamp();
833             if (commitRequests.containsKey(startTimestamp)) {
834                 RequestAndTimeout r = commitRequests.remove(startTimestamp);
835                 if (r.getTimeout() != null) {
836                     r.getTimeout().cancel();
837                 }
838                 queueRetryOrError(fsm, r.getRequest());
839             }
840             return this;
841         }
842 
843         public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
844             long tableID = e.getTableID();
845             if (fenceRequests.containsKey(tableID)) {
846                 RequestAndTimeout r = fenceRequests.remove(tableID);
847                 if (r.getTimeout() != null) {
848                     r.getTimeout().cancel();
849                 }
850                 queueRetryOrError(fsm, r.getRequest());
851             }
852             return this;
853         }
854 
855         public StateMachine.State handleEvent(CloseEvent e) {
856             LOG.debug("CONNECTED STATE: CloseEvent");
857             timeoutExecutor.stop();
858             closeChannelAndErrorRequests();
859             fsm.deferEvent(e);
860             return new ClosingState(fsm);
861         }
862 
863         public StateMachine.State handleEvent(RequestEvent e) {
864             sendRequest(fsm, e);
865             return this;
866         }
867 
868         public StateMachine.State handleEvent(ResponseEvent e) {
869             handleResponse(e);
870             return this;
871         }
872 
873         public StateMachine.State handleEvent(ErrorEvent e) {
874             LOG.debug("CONNECTED STATE: ErrorEvent");
875             timeoutExecutor.stop();
876             handleError(fsm);
877             return new ClosingState(fsm);
878         }
879 
880         private void handleError(StateMachine.Fsm fsm) {
881             LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
882             while (timestampRequests.size() > 0) {
883                 RequestAndTimeout r = timestampRequests.remove();
884                 if (r.getTimeout() != null) {
885                     r.getTimeout().cancel();
886                 }
887                 queueRetryOrError(fsm, r.getRequest());
888             }
889             Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
890             while (iter.hasNext()) {
891                 RequestAndTimeout r = iter.next().getValue();
892                 if (r.getTimeout() != null) {
893                     r.getTimeout().cancel();
894                 }
895                 queueRetryOrError(fsm, r.getRequest());
896                 iter.remove();
897             }
898             iter = fenceRequests.entrySet().iterator();
899             while (iter.hasNext()) {
900                 RequestAndTimeout r = iter.next().getValue();
901                 if (r.getTimeout() != null) {
902                     r.getTimeout().cancel();
903                 }
904                 queueRetryOrError(fsm, r.getRequest());
905                 iter.remove();
906             }
907             channel.close();
908         }
909 
910         private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
911             if (e.getRetriesLeft() > 0) {
912                 e.decrementRetries();
913                 if (e.getRequest().hasCommitRequest()) {
914                     TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
915                     if (!commitRequest.getIsRetry()) { // Create a new retry for the commit request
916                         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
917                         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
918                         commitBuilder.mergeFrom(commitRequest);
919                         commitBuilder.setIsRetry(true);
920                         builder.setCommitRequest(commitBuilder.build());
921                         e.setRequest(builder.build());
922                     }
923                 }
924                 fsm.sendEvent(e);
925             } else {
926                 e.error(
927                         new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
928             }
929         }
930 
931         private void closeChannelAndErrorRequests() {
932             channel.close();
933             for (RequestAndTimeout r : timestampRequests) {
934                 if (r.getTimeout() != null) {
935                     r.getTimeout().cancel();
936                 }
937                 r.getRequest().error(new ClosingException());
938             }
939             for (RequestAndTimeout r : commitRequests.values()) {
940                 if (r.getTimeout() != null) {
941                     r.getTimeout().cancel();
942                 }
943                 r.getRequest().error(new ClosingException());
944             }
945             for (RequestAndTimeout r : fenceRequests.values()) {
946                 if (r.getTimeout() != null) {
947                     r.getTimeout().cancel();
948                 }
949                 r.getRequest().error(new ClosingException());
950             }
951         }
952     }
953 
954     private class ClosingState extends BaseState {
955 
956         ClosingState(StateMachine.Fsm fsm) {
957             super(fsm);
958             LOG.debug("NEW STATE: CLOSING");
959         }
960 
961         public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
962             // Ignored. They will be retried or errored
963             return this;
964         }
965 
966         public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
967             // Ignored. They will be retried or errored
968             return this;
969         }
970 
971         public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
972             // Ignored. They will be retried or errored
973             return this;
974         }
975 
976         public StateMachine.State handleEvent(ErrorEvent e) {
977             // Ignored. They will be retried or errored
978             return this;
979         }
980 
981         public StateMachine.State handleEvent(ResponseEvent e) {
982             // Ignored. They will be retried or errored
983             return this;
984         }
985 
986         public StateMachine.State handleEvent(UserEvent e) {
987             fsm.deferEvent(e);
988             return this;
989         }
990 
991         public StateMachine.State handleEvent(ChannelClosedEvent e) {
992             return new DisconnectedState(fsm);
993         }
994 
995         public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
996             return this;
997         }
998 
999     }
1000 
1001     // ----------------------------------------------------------------------------------------------------------------
1002     // Helper classes & methods
1003     // ----------------------------------------------------------------------------------------------------------------
1004 
1005     private class Handler extends SimpleChannelHandler {
1006 
1007         private StateMachine.Fsm fsm;
1008 
1009         Handler(StateMachine.Fsm fsm) {
1010             this.fsm = fsm;
1011         }
1012 
1013         @Override
1014         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
1015             currentChannel = e.getChannel();
1016             LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
1017             fsm.sendEvent(new ConnectedEvent(e.getChannel()));
1018         }
1019 
1020         @Override
1021         public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1022             LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
1023             fsm.sendEvent(new ErrorEvent(new ConnectionException()));
1024         }
1025 
1026         @Override
1027         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1028             LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
1029             fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
1030         }
1031 
1032         @Override
1033         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
1034             if (e.getMessage() instanceof TSOProto.Response) {
1035                 fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
1036             } else {
1037                 LOG.warn("Received unknown message", e.getMessage());
1038             }
1039         }
1040 
1041         @Override
1042         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
1043             LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
1044             fsm.sendEvent(new ErrorEvent(e.getCause()));
1045         }
1046     }
1047 
1048     private synchronized void setTSOAddress(String host, int port) {
1049         tsoAddr = new InetSocketAddress(host, port);
1050     }
1051 
1052     private synchronized InetSocketAddress getAddress() {
1053         return tsoAddr;
1054     }
1055 
1056     private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
1057         try {
1058             currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
1059             currentTSOZNode.getListenable().addListener(this);
1060             currentTSOZNode.start(true);
1061         } catch (Exception e) {
1062             throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
1063         }
1064     }
1065 
1066     private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
1067         ChildData currentTSOData = currentTSOZNode.getCurrentData();
1068         if (currentTSOData == null) {
1069             throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
1070         }
1071         byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
1072         if (currentTSOAndEpochAsBytes == null) {
1073             throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
1074         }
1075         return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
1076     }
1077 
1078 }