1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractIdleService;
21 import com.google.inject.Binder;
22 import com.google.inject.Guice;
23 import com.google.inject.Inject;
24 import com.google.inject.Injector;
25 import com.google.inject.Module;
26 import com.google.inject.Singleton;
27 import org.apache.omid.committable.hbase.HBaseCommitTableStorageModule;
28 import org.apache.omid.metrics.MetricsRegistry;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.List;
35
36 @Singleton
37 public class TSOServer extends AbstractIdleService {
38
39 private static final Logger LOG = LoggerFactory.getLogger(TSOServer.class);
40
41 public static final String DASH_SEPARATOR_80_CHARS =
42 "--------------------------------------------------------------------------------";
43
44 public static final String TSO_HOST_AND_PORT_KEY = "tso.hostandport";
45
46 @Inject
47 private TSOStateManager tsoStateManager;
48 @Inject
49 private RequestProcessor requestProcessor;
50 @Inject
51 private PersistenceProcessor persistenceProcessor;
52 @Inject
53 private RetryProcessor retryProcessor;
54 @Inject
55 public ReplyProcessor replyProcessor;
56 @Inject
57 private LowWatermarkWriter lowWatermarkWriter;
58
59
60
61
62 @Inject
63 private LeaseManagement leaseManagement;
64
65
66
67 public static TSOServer getInitializedTsoServer(TSOServerConfig config) throws IOException {
68 LOG.info("Configuring TSO Server...");
69 Injector injector = Guice.createInjector(buildModuleList(config));
70 LOG.info("TSO Server configured. Creating instance...");
71 return injector.getInstance(TSOServer.class);
72 }
73
74 private static List<Module> buildModuleList(final TSOServerConfig config) throws IOException {
75
76 List<Module> guiceModules = new ArrayList<>();
77 guiceModules.add(config.getTimestampStoreModule());
78 guiceModules.add(config.getCommitTableStoreModule());
79 guiceModules.add(config.getLeaseModule());
80 guiceModules.add(new TSOModule(config));
81
82 guiceModules.add(new Module() {
83 @Override
84 public void configure(Binder binder) {
85 LOG.info("\t* Metrics provider module set to {}", config.getMetrics().getClass());
86 binder.bind(MetricsRegistry.class).toInstance(config.getMetrics());
87 }
88 });
89 return guiceModules;
90 }
91
92
93
94
95
96 @Override
97 protected void startUp() throws Exception {
98 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
99 LOG.info("Starting TSO Server");
100 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
101 tsoStateManager.register(requestProcessor);
102 leaseManagement.startService();
103 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
104 if (leaseManagement instanceof VoidLeaseManager) {
105 LOG.info("TSO Server running and accepting connections");
106 } else if (leaseManagement instanceof LeaseManager) {
107 LOG.info("TSO Server running on HA mode. Waiting to be signaled as the Master replica...");
108 } else {
109 throw new RuntimeException("Wrong TSO mode");
110 }
111 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
112 }
113
114 @Override
115 protected void shutDown() throws Exception {
116 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
117 LOG.info("Shutting Down TSO Server");
118 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
119 leaseManagement.stopService();
120 tsoStateManager.unregister(requestProcessor);
121 requestProcessor.close();
122 persistenceProcessor.close();
123 retryProcessor.close();
124 replyProcessor.close();
125 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
126 LOG.info("TSO Server stopped");
127 LOG.info("{}", DASH_SEPARATOR_80_CHARS);
128 }
129
130
131
132 private void attachShutDownHook() {
133 Runtime.getRuntime().addShutdownHook(new Thread() {
134 @Override
135 public void run() {
136 stopAsync();
137 awaitTerminated();
138 }
139 });
140 LOG.info("Shutdown Hook Attached");
141 }
142
143
144
145
146 public static void main(String[] args) {
147
148 TSOServerConfig config = new TSOServerConfig();
149
150 try {
151 TSOServer tsoServer = getInitializedTsoServer(config);
152 tsoServer.attachShutDownHook();
153 tsoServer.startAsync();
154 tsoServer.awaitRunning();
155 if (config.getLowLatency() &&
156 !(config.getCommitTableStoreModule() instanceof HBaseCommitTableStorageModule)) {
157 LOG.error("Running low latency mode with memory commit table. Use only with testing!");
158 }
159 } catch (Exception e) {
160 System.out.println(e.getMessage());
161 System.exit(-1);
162 }
163
164 }
165
166 }