1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
21 import org.apache.phoenix.thirdparty.com.google.common.net.HostAndPort;
22 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture;
23 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import org.apache.omid.proto.TSOProto;
26 import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
27 import org.apache.omid.zk.ZKUtils;
28 import org.apache.statemachine.StateMachine;
29 import org.apache.curator.framework.CuratorFramework;
30 import org.apache.curator.framework.recipes.cache.ChildData;
31 import org.apache.curator.framework.recipes.cache.NodeCache;
32 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
33 import org.jboss.netty.bootstrap.ClientBootstrap;
34 import org.jboss.netty.channel.Channel;
35 import org.jboss.netty.channel.ChannelFactory;
36 import org.jboss.netty.channel.ChannelFuture;
37 import org.jboss.netty.channel.ChannelFutureListener;
38 import org.jboss.netty.channel.ChannelHandlerContext;
39 import org.jboss.netty.channel.ChannelPipeline;
40 import org.jboss.netty.channel.ChannelStateEvent;
41 import org.jboss.netty.channel.ExceptionEvent;
42 import org.jboss.netty.channel.MessageEvent;
43 import org.jboss.netty.channel.SimpleChannelHandler;
44 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
45 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
46 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
47 import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
48 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
49 import org.jboss.netty.util.HashedWheelTimer;
50 import org.jboss.netty.util.Timeout;
51 import org.jboss.netty.util.TimerTask;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import java.io.IOException;
56 import java.net.InetSocketAddress;
57 import java.util.ArrayDeque;
58 import java.util.HashMap;
59 import java.util.HashSet;
60 import java.util.Iterator;
61 import java.util.Map;
62 import java.util.Queue;
63 import java.util.Set;
64 import java.util.concurrent.ExecutionException;
65 import java.util.concurrent.Executors;
66 import java.util.concurrent.ScheduledExecutorService;
67 import java.util.concurrent.TimeUnit;
68
69
70
71
72
73 public class TSOClient implements TSOProtocol, NodeCacheListener {
74
75 private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
76
77
78 public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
79
80 private static final long DEFAULT_EPOCH = -1L;
81 private volatile long epoch = DEFAULT_EPOCH;
82
83
84 private CuratorFramework zkClient;
85 private NodeCache currentTSOZNode;
86
87 private ChannelFactory factory;
88 private ClientBootstrap bootstrap;
89 private Channel currentChannel;
90 private final ScheduledExecutorService fsmExecutor;
91 StateMachine.Fsm fsm;
92
93 private final int requestTimeoutInMs;
94 private final int requestMaxRetries;
95 private final int tsoReconnectionDelayInSecs;
96 private InetSocketAddress tsoAddr;
97 private String zkCurrentTsoPath;
98
99 private boolean lowLatency;
100
101
102
103
104 private ConflictDetectionLevel conflictDetectionLevel;
105
106
107
108
109
110
111 public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
112 return new TSOClient(tsoClientConf);
113 }
114
115
116 private TSOClient(OmidClientConfiguration omidConf) throws IOException {
117
118
119 int tsoExecutorThreads = omidConf.getExecutorThreads();
120
121 factory = new NioClientSocketChannelFactory(
122 Executors.newCachedThreadPool(
123 new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
124 Executors.newCachedThreadPool(
125 new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
126
127 bootstrap = new ClientBootstrap(factory);
128
129 requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
130 requestMaxRetries = omidConf.getRequestMaxRetries();
131 tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
132
133 LOG.info("Connecting to TSO...");
134 HostAndPort hp;
135 switch (omidConf.getConnectionType()) {
136 case HA:
137 zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
138 omidConf.getZkNamespace(),
139 omidConf.getZkConnectionTimeoutInSecs());
140 zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
141 configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
142 String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
143
144 String[] currentTSOAndEpochArray = tsoInfo.split("#");
145 hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
146 setTSOAddress(hp.getHost(), hp.getPort());
147 epoch = Long.parseLong(currentTSOAndEpochArray[1]);
148 LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
149 break;
150 case DIRECT:
151 default:
152 hp = HostAndPort.fromString(omidConf.getConnectionString());
153 setTSOAddress(hp.getHost(), hp.getPort());
154 LOG.info("\t* TSO host:port {} will be connected directly", hp);
155 break;
156 }
157
158 fsmExecutor = Executors.newSingleThreadScheduledExecutor(
159 new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
160 fsm = new StateMachine.FsmImpl(fsmExecutor);
161 fsm.setInitState(new DisconnectedState(fsm));
162
163 ChannelPipeline pipeline = bootstrap.getPipeline();
164 pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
165 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
166 pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
167 pipeline.addLast("protobufencoder", new ProtobufEncoder());
168 pipeline.addLast("handler", new Handler(fsm));
169
170 bootstrap.setOption("tcpNoDelay", true);
171 bootstrap.setOption("keepAlive", true);
172 bootstrap.setOption("reuseAddress", true);
173 bootstrap.setOption("connectTimeoutMillis", 100);
174 lowLatency = false;
175
176
177
178 conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
179
180 }
181
182
183
184
185
186
187
188
189 @Override
190 public TSOFuture<Long> getNewStartTimestamp() {
191 TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
192 TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
193 builder.setTimestampRequest(tsreqBuilder.build());
194 RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
195 fsm.sendEvent(request);
196 return new ForwardingTSOFuture<>(request);
197 }
198
199
200
201
202 @Override
203 public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
204 return commit(transactionId, cells, new HashSet<CellId>());
205 }
206
207
208
209
210 @Override
211 public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells, Set<? extends CellId> conflictFreeWriteSet) {
212 TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
213 TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
214 commitbuilder.setStartTimestamp(transactionId);
215 HashSet<Long> rowLevelWriteSet = new HashSet<Long>();
216 HashSet<Long> tableIDs = new HashSet<Long>();
217 rowLevelWriteSet.clear();
218 for (CellId cell : cells) {
219 long id;
220
221 switch (conflictDetectionLevel) {
222 case ROW:
223 id = cell.getRowId();
224 if (rowLevelWriteSet.contains(id)) {
225 continue;
226 } else {
227 rowLevelWriteSet.add(id);
228 }
229 break;
230 case CELL:
231 id = cell.getCellId();
232 break;
233 default:
234 id = 0;
235 assert (false);
236 }
237
238 commitbuilder.addCellId(id);
239 tableIDs.add(cell.getTableId());
240 }
241
242 for (CellId cell : conflictFreeWriteSet) {
243 tableIDs.add(cell.getTableId());
244 }
245
246 commitbuilder.addAllTableId(tableIDs);
247 tableIDs.clear();
248 builder.setCommitRequest(commitbuilder.build());
249 RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
250 fsm.sendEvent(request);
251 return new ForwardingTSOFuture<>(request);
252 }
253
254
255
256
257 @Override
258 public TSOFuture<Long> getFence(long tableId) {
259 TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
260 TSOProto.FenceRequest.Builder fenceReqBuilder = TSOProto.FenceRequest.newBuilder();
261 fenceReqBuilder.setTableId(tableId);
262 builder.setFenceRequest(fenceReqBuilder.build());
263 RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
264 fsm.sendEvent(request);
265 return new ForwardingTSOFuture<>(request);
266 }
267
268
269
270
271 @Override
272 public TSOFuture<Void> close() {
273 final CloseEvent closeEvent = new CloseEvent();
274 fsm.sendEvent(closeEvent);
275 closeEvent.addListener(new Runnable() {
276 @Override
277 public void run() {
278 try {
279 closeEvent.get();
280 } catch (InterruptedException e) {
281 Thread.currentThread().interrupt();
282 e.printStackTrace();
283 } catch (ExecutionException e) {
284 e.printStackTrace();
285 } finally {
286 fsmExecutor.shutdown();
287 if (currentTSOZNode != null) {
288 try {
289 currentTSOZNode.close();
290 } catch (IOException e) {
291 e.printStackTrace();
292 }
293 }
294 if (zkClient != null) {
295 zkClient.close();
296 }
297 }
298
299 }
300 }, fsmExecutor);
301 return new ForwardingTSOFuture<>(closeEvent);
302 }
303
304
305
306
307
308
309
310
311 @Override
312 public long getEpoch() {
313 return epoch;
314 }
315
316
317
318
319
320 public ConflictDetectionLevel getConflictDetectionLevel() {
321 return conflictDetectionLevel;
322 }
323
324
325
326
327 public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
328 this.conflictDetectionLevel = conflictDetectionLevel;
329 }
330
331
332
333
334
335 @Override
336 public void nodeChanged() throws Exception {
337
338 String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
339
340 String[] currentTSOAndEpochArray = tsoInfo.split("#");
341 HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
342 setTSOAddress(hp.getHost(), hp.getPort());
343 epoch = Long.parseLong(currentTSOAndEpochArray[1]);
344 LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
345 if (currentChannel != null && currentChannel.isConnected()) {
346 LOG.info("\tClosing channel with previous TSO {}", currentChannel);
347 currentChannel.close();
348 }
349
350 }
351
352 @Override
353 public boolean isLowLatency() {
354 return lowLatency;
355 }
356
357
358
359
360
361
362
363 private static class ParamEvent<T> implements StateMachine.Event {
364
365 final T param;
366
367 ParamEvent(T param) {
368 this.param = param;
369 }
370
371 T getParam() {
372 return param;
373 }
374 }
375
376 private static class ErrorEvent extends ParamEvent<Throwable> {
377
378 ErrorEvent(Throwable t) {
379 super(t);
380 }
381 }
382
383 private static class ConnectedEvent extends ParamEvent<Channel> {
384
385 ConnectedEvent(Channel c) {
386 super(c);
387 }
388 }
389
390 private static class UserEvent<T> extends AbstractFuture<T>
391 implements StateMachine.DeferrableEvent {
392
393 void success(T value) {
394 set(value);
395 }
396
397 @Override
398 public void error(Throwable t) {
399 setException(t);
400 }
401 }
402
403 private static class CloseEvent extends UserEvent<Void> {
404
405 }
406
407 private static class ChannelClosedEvent extends ParamEvent<Throwable> {
408
409 ChannelClosedEvent(Throwable t) {
410 super(t);
411 }
412 }
413
414 private static class ReconnectEvent implements StateMachine.Event {
415
416 }
417
418 private static class HandshakeTimeoutEvent implements StateMachine.Event {
419
420 }
421
422 private static class TimestampRequestTimeoutEvent implements StateMachine.Event {
423
424 }
425
426 private static class CommitRequestTimeoutEvent implements StateMachine.Event {
427
428 final long startTimestamp;
429
430 CommitRequestTimeoutEvent(long startTimestamp) {
431 this.startTimestamp = startTimestamp;
432 }
433
434 public long getStartTimestamp() {
435 return startTimestamp;
436 }
437 }
438
439 private static class FenceRequestTimeoutEvent implements StateMachine.Event {
440
441 final long tableID;
442
443 FenceRequestTimeoutEvent(long tableID) {
444 this.tableID = tableID;
445 }
446
447 public long getTableID() {
448 return tableID;
449 }
450 }
451
452 private static class RequestEvent extends UserEvent<Long> {
453
454 TSOProto.Request req;
455 int retriesLeft;
456
457 RequestEvent(TSOProto.Request req, int retriesLeft) {
458 this.req = req;
459 this.retriesLeft = retriesLeft;
460 }
461
462 TSOProto.Request getRequest() {
463 return req;
464 }
465
466 void setRequest(TSOProto.Request request) {
467 this.req = request;
468 }
469
470 int getRetriesLeft() {
471 return retriesLeft;
472 }
473
474 void decrementRetries() {
475 retriesLeft--;
476 }
477
478 }
479
480 private static class ResponseEvent extends ParamEvent<TSOProto.Response> {
481
482 ResponseEvent(TSOProto.Response r) {
483 super(r);
484 }
485 }
486
487
488
489
490
491 class BaseState extends StateMachine.State {
492
493 BaseState(StateMachine.Fsm fsm) {
494 super(fsm);
495 }
496
497 public StateMachine.State handleEvent(StateMachine.Event e) {
498 LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
499 return this;
500 }
501 }
502
503 class DisconnectedState extends BaseState {
504
505 DisconnectedState(StateMachine.Fsm fsm) {
506 super(fsm);
507 LOG.debug("NEW STATE: DISCONNECTED");
508 }
509
510 public StateMachine.State handleEvent(RequestEvent e) {
511 fsm.deferEvent(e);
512 return tryToConnectToTSOServer();
513 }
514
515 public StateMachine.State handleEvent(CloseEvent e) {
516 factory.releaseExternalResources();
517 e.success(null);
518 return this;
519 }
520
521 private StateMachine.State tryToConnectToTSOServer() {
522 final InetSocketAddress tsoAddress = getAddress();
523 LOG.info("Trying to connect to TSO [{}]", tsoAddress);
524 ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
525 channelFuture.addListener(new ChannelFutureListener() {
526 @Override
527 public void operationComplete(ChannelFuture channelFuture) throws Exception {
528 if (channelFuture.isSuccess()) {
529 LOG.info("Connection to TSO [{}] established. Channel {}",
530 tsoAddress, channelFuture.getChannel());
531 } else {
532 LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
533 tsoAddress, channelFuture.getChannel());
534 }
535 }
536 });
537 return new ConnectingState(fsm);
538 }
539 }
540
541 private class ConnectingState extends BaseState {
542
543 ConnectingState(StateMachine.Fsm fsm) {
544 super(fsm);
545 LOG.debug("NEW STATE: CONNECTING");
546 }
547
548 public StateMachine.State handleEvent(UserEvent e) {
549 fsm.deferEvent(e);
550 return this;
551 }
552
553 public StateMachine.State handleEvent(ConnectedEvent e) {
554 return new HandshakingState(fsm, e.getParam());
555 }
556
557 public StateMachine.State handleEvent(ChannelClosedEvent e) {
558 return new ConnectionFailedState(fsm, e.getParam());
559 }
560
561 public StateMachine.State handleEvent(ErrorEvent e) {
562 return new ConnectionFailedState(fsm, e.getParam());
563 }
564
565 }
566
567 private static class RequestAndTimeout {
568
569 final RequestEvent event;
570 final Timeout timeout;
571
572 RequestAndTimeout(RequestEvent event, Timeout timeout) {
573 this.event = event;
574 this.timeout = timeout;
575 }
576
577 RequestEvent getRequest() {
578 return event;
579 }
580
581 Timeout getTimeout() {
582 return timeout;
583 }
584
585 public String toString() {
586 String info = "Request type ";
587 if (event.getRequest().hasTimestampRequest()) {
588 info += "[Timestamp]";
589 } else if (event.getRequest().hasCommitRequest()) {
590 info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
591 } else {
592 info += "NONE";
593 }
594 return info;
595 }
596 }
597
598 private class HandshakingState extends BaseState {
599
600 final Channel channel;
601
602 final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
603 new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
604 final Timeout timeout;
605
606 HandshakingState(StateMachine.Fsm fsm, Channel channel) {
607 super(fsm);
608 LOG.debug("NEW STATE: HANDSHAKING");
609 this.channel = channel;
610 TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
611
612 handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
613 channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
614 timeout = newTimeout();
615 }
616
617 private Timeout newTimeout() {
618 if (requestTimeoutInMs > 0) {
619 return timeoutExecutor.newTimeout(new TimerTask() {
620 @Override
621 public void run(Timeout timeout) {
622 fsm.sendEvent(new HandshakeTimeoutEvent());
623 }
624 }, 30, TimeUnit.SECONDS);
625 } else {
626 return null;
627 }
628 }
629
630 public StateMachine.State handleEvent(UserEvent e) {
631 fsm.deferEvent(e);
632 return this;
633 }
634
635 public StateMachine.State handleEvent(ResponseEvent e) {
636 lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
637 if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
638 if (timeout != null) {
639 timeout.cancel();
640 }
641 return new ConnectedState(fsm, channel, timeoutExecutor);
642 } else {
643 cleanupState();
644 LOG.error("Client incompatible with server");
645 return new HandshakeFailedState(fsm, new HandshakeFailedException());
646 }
647 }
648
649 public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
650 cleanupState();
651 return new ClosingState(fsm);
652 }
653
654 public StateMachine.State handleEvent(ErrorEvent e) {
655 cleanupState();
656 Throwable exception = e.getParam();
657 LOG.error("Error during handshake", exception);
658 return new HandshakeFailedState(fsm, exception);
659 }
660
661 private void cleanupState() {
662 timeoutExecutor.stop();
663 channel.close();
664 if (timeout != null) {
665 timeout.cancel();
666 }
667 }
668
669 }
670
671 class ConnectionFailedState extends BaseState {
672
673 final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
674 new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
675
676 Throwable exception;
677
678 ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
679 super(fsm);
680 LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
681 this.exception = exception;
682 reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
683 @Override
684 public void run(Timeout timeout) {
685 fsm.sendEvent(new ReconnectEvent());
686 }
687 }, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
688 }
689
690 public StateMachine.State handleEvent(UserEvent e) {
691 e.error(exception);
692 return this;
693 }
694
695 public StateMachine.State handleEvent(ErrorEvent e) {
696 return this;
697 }
698
699 public StateMachine.State handleEvent(ChannelClosedEvent e) {
700 return new DisconnectedState(fsm);
701 }
702
703 public StateMachine.State handleEvent(ReconnectEvent e) {
704 return new DisconnectedState(fsm);
705 }
706
707 }
708
709 private class HandshakeFailedState extends ConnectionFailedState {
710
711 HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
712 super(fsm, exception);
713 LOG.debug("STATE: HANDSHAKING FAILED");
714 }
715
716 }
717
718 class ConnectedState extends BaseState {
719
720 final Queue<RequestAndTimeout> timestampRequests;
721 final Map<Long, RequestAndTimeout> commitRequests;
722 final Map<Long, RequestAndTimeout> fenceRequests;
723 final Channel channel;
724
725 final HashedWheelTimer timeoutExecutor;
726
727 ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
728 super(fsm);
729 LOG.debug("NEW STATE: CONNECTED");
730 this.channel = channel;
731 this.timeoutExecutor = timeoutExecutor;
732 timestampRequests = new ArrayDeque<>();
733 commitRequests = new HashMap<>();
734 fenceRequests = new HashMap<>();
735 }
736
737 private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
738 if (requestTimeoutInMs > 0) {
739 return timeoutExecutor.newTimeout(new TimerTask() {
740 @Override
741 public void run(Timeout timeout) {
742 fsm.sendEvent(timeoutEvent);
743 }
744 }, requestTimeoutInMs, TimeUnit.MILLISECONDS);
745 } else {
746 return null;
747 }
748 }
749
750 private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
751 TSOProto.Request req = request.getRequest();
752
753 if (req.hasTimestampRequest()) {
754 timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
755 } else if (req.hasCommitRequest()) {
756 TSOProto.CommitRequest commitReq = req.getCommitRequest();
757 commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
758 request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
759 } else if (req.hasFenceRequest()) {
760 TSOProto.FenceRequest fenceReq = req.getFenceRequest();
761 fenceRequests.put(fenceReq.getTableId(), new RequestAndTimeout(
762 request, newTimeout(new FenceRequestTimeoutEvent(fenceReq.getTableId()))));
763 } else {
764 request.error(new IllegalArgumentException("Unknown request type"));
765 return;
766 }
767 ChannelFuture f = channel.write(req);
768
769 f.addListener(new ChannelFutureListener() {
770 @Override
771 public void operationComplete(ChannelFuture future) {
772 if (!future.isSuccess()) {
773 fsm.sendEvent(new ErrorEvent(future.getCause()));
774 }
775 }
776 });
777 }
778
779 private void handleResponse(ResponseEvent response) {
780 TSOProto.Response resp = response.getParam();
781 if (resp.hasTimestampResponse()) {
782 if (timestampRequests.size() == 0) {
783 LOG.debug("Received timestamp response when no requests outstanding");
784 return;
785 }
786 RequestAndTimeout e = timestampRequests.remove();
787 e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
788 if (e.getTimeout() != null) {
789 e.getTimeout().cancel();
790 }
791 } else if (resp.hasCommitResponse()) {
792 long startTimestamp = resp.getCommitResponse().getStartTimestamp();
793 RequestAndTimeout e = commitRequests.remove(startTimestamp);
794 if (e == null) {
795 LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
796 return;
797 }
798 if (e.getTimeout() != null) {
799 e.getTimeout().cancel();
800 }
801 if (resp.getCommitResponse().getAborted()) {
802 e.getRequest().error(new AbortException());
803 } else {
804 e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
805 }
806 } else if (resp.hasFenceResponse()) {
807 long tableID = resp.getFenceResponse().getTableId();
808 RequestAndTimeout e = fenceRequests.remove(tableID);
809 if (e == null) {
810 LOG.debug("Received fence response for request that doesn't exist. Table ID: {}", tableID);
811 return;
812 }
813 if (e.getTimeout() != null) {
814 e.getTimeout().cancel();
815 }
816
817 e.getRequest().success(resp.getFenceResponse().getFenceId());
818 }
819 }
820
821 public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
822 if (!timestampRequests.isEmpty()) {
823 RequestAndTimeout r = timestampRequests.remove();
824 if (r.getTimeout() != null) {
825 r.getTimeout().cancel();
826 }
827 queueRetryOrError(fsm, r.getRequest());
828 }
829 return this;
830 }
831
832 public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
833 long startTimestamp = e.getStartTimestamp();
834 if (commitRequests.containsKey(startTimestamp)) {
835 RequestAndTimeout r = commitRequests.remove(startTimestamp);
836 if (r.getTimeout() != null) {
837 r.getTimeout().cancel();
838 }
839 queueRetryOrError(fsm, r.getRequest());
840 }
841 return this;
842 }
843
844 public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
845 long tableID = e.getTableID();
846 if (fenceRequests.containsKey(tableID)) {
847 RequestAndTimeout r = fenceRequests.remove(tableID);
848 if (r.getTimeout() != null) {
849 r.getTimeout().cancel();
850 }
851 queueRetryOrError(fsm, r.getRequest());
852 }
853 return this;
854 }
855
856 public StateMachine.State handleEvent(CloseEvent e) {
857 LOG.debug("CONNECTED STATE: CloseEvent");
858 timeoutExecutor.stop();
859 closeChannelAndErrorRequests();
860 fsm.deferEvent(e);
861 return new ClosingState(fsm);
862 }
863
864 public StateMachine.State handleEvent(RequestEvent e) {
865 sendRequest(fsm, e);
866 return this;
867 }
868
869 public StateMachine.State handleEvent(ResponseEvent e) {
870 handleResponse(e);
871 return this;
872 }
873
874 public StateMachine.State handleEvent(ErrorEvent e) {
875 LOG.debug("CONNECTED STATE: ErrorEvent");
876 timeoutExecutor.stop();
877 handleError(fsm);
878 return new ClosingState(fsm);
879 }
880
881 private void handleError(StateMachine.Fsm fsm) {
882 LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
883 while (timestampRequests.size() > 0) {
884 RequestAndTimeout r = timestampRequests.remove();
885 if (r.getTimeout() != null) {
886 r.getTimeout().cancel();
887 }
888 queueRetryOrError(fsm, r.getRequest());
889 }
890 Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
891 while (iter.hasNext()) {
892 RequestAndTimeout r = iter.next().getValue();
893 if (r.getTimeout() != null) {
894 r.getTimeout().cancel();
895 }
896 queueRetryOrError(fsm, r.getRequest());
897 iter.remove();
898 }
899 iter = fenceRequests.entrySet().iterator();
900 while (iter.hasNext()) {
901 RequestAndTimeout r = iter.next().getValue();
902 if (r.getTimeout() != null) {
903 r.getTimeout().cancel();
904 }
905 queueRetryOrError(fsm, r.getRequest());
906 iter.remove();
907 }
908 channel.close();
909 }
910
911 private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
912 if (e.getRetriesLeft() > 0) {
913 e.decrementRetries();
914 if (e.getRequest().hasCommitRequest()) {
915 TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
916 if (!commitRequest.getIsRetry()) {
917 TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
918 TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
919 commitBuilder.mergeFrom(commitRequest);
920 commitBuilder.setIsRetry(true);
921 builder.setCommitRequest(commitBuilder.build());
922 e.setRequest(builder.build());
923 }
924 }
925 fsm.sendEvent(e);
926 } else {
927 e.error(
928 new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
929 }
930 }
931
932 private void closeChannelAndErrorRequests() {
933 channel.close();
934 for (RequestAndTimeout r : timestampRequests) {
935 if (r.getTimeout() != null) {
936 r.getTimeout().cancel();
937 }
938 r.getRequest().error(new ClosingException());
939 }
940 for (RequestAndTimeout r : commitRequests.values()) {
941 if (r.getTimeout() != null) {
942 r.getTimeout().cancel();
943 }
944 r.getRequest().error(new ClosingException());
945 }
946 for (RequestAndTimeout r : fenceRequests.values()) {
947 if (r.getTimeout() != null) {
948 r.getTimeout().cancel();
949 }
950 r.getRequest().error(new ClosingException());
951 }
952 }
953 }
954
955 private class ClosingState extends BaseState {
956
957 ClosingState(StateMachine.Fsm fsm) {
958 super(fsm);
959 LOG.debug("NEW STATE: CLOSING");
960 }
961
962 public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
963
964 return this;
965 }
966
967 public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
968
969 return this;
970 }
971
972 public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
973
974 return this;
975 }
976
977 public StateMachine.State handleEvent(ErrorEvent e) {
978
979 return this;
980 }
981
982 public StateMachine.State handleEvent(ResponseEvent e) {
983
984 return this;
985 }
986
987 public StateMachine.State handleEvent(UserEvent e) {
988 fsm.deferEvent(e);
989 return this;
990 }
991
992 public StateMachine.State handleEvent(ChannelClosedEvent e) {
993 return new DisconnectedState(fsm);
994 }
995
996 public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
997 return this;
998 }
999
1000 }
1001
1002
1003
1004
1005
1006 private class Handler extends SimpleChannelHandler {
1007
1008 private StateMachine.Fsm fsm;
1009
1010 Handler(StateMachine.Fsm fsm) {
1011 this.fsm = fsm;
1012 }
1013
1014 @Override
1015 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
1016 currentChannel = e.getChannel();
1017 LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
1018 fsm.sendEvent(new ConnectedEvent(e.getChannel()));
1019 }
1020
1021 @Override
1022 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1023 LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
1024 fsm.sendEvent(new ErrorEvent(new ConnectionException()));
1025 }
1026
1027 @Override
1028 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1029 LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
1030 fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
1031 }
1032
1033 @Override
1034 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
1035 if (e.getMessage() instanceof TSOProto.Response) {
1036 fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
1037 } else {
1038 LOG.warn("Received unknown message", e.getMessage());
1039 }
1040 }
1041
1042 @Override
1043 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
1044 LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
1045 fsm.sendEvent(new ErrorEvent(e.getCause()));
1046 }
1047 }
1048
1049 private synchronized void setTSOAddress(String host, int port) {
1050 tsoAddr = new InetSocketAddress(host, port);
1051 }
1052
1053 private synchronized InetSocketAddress getAddress() {
1054 return tsoAddr;
1055 }
1056
1057 private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
1058 try {
1059 currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
1060 currentTSOZNode.getListenable().addListener(this);
1061 currentTSOZNode.start(true);
1062 } catch (Exception e) {
1063 throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
1064 }
1065 }
1066
1067 private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
1068 ChildData currentTSOData = currentTSOZNode.getCurrentData();
1069 if (currentTSOData == null) {
1070 throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
1071 }
1072 byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
1073 if (currentTSOAndEpochAsBytes == null) {
1074 throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
1075 }
1076 return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
1077 }
1078
1079 }