1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import com.google.inject.Guice;
21 import com.google.inject.Injector;
22
23 import org.apache.curator.framework.CuratorFramework;
24 import org.apache.curator.test.TestingServer;
25 import org.apache.curator.utils.CloseableUtils;
26 import org.apache.omid.TestUtils;
27 import org.apache.omid.committable.CommitTable;
28 import org.apache.omid.tso.HALeaseManagementModule;
29 import org.apache.omid.tso.TSOMockModule;
30 import org.apache.omid.tso.TSOServer;
31 import org.apache.omid.tso.TSOServerConfig;
32 import org.apache.omid.tso.VoidLeaseManagementModule;
33 import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
34 import org.apache.statemachine.StateMachine.FsmImpl;
35 import org.apache.zookeeper.KeeperException.NoNodeException;
36 import org.apache.zookeeper.data.Stat;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.testng.annotations.AfterMethod;
40 import org.testng.annotations.BeforeMethod;
41 import org.testng.annotations.Test;
42
43 import java.util.concurrent.ExecutionException;
44
45 import static org.testng.Assert.assertEquals;
46 import static org.testng.Assert.assertNotNull;
47 import static org.testng.Assert.assertNull;
48 import static org.testng.Assert.assertTrue;
49 import static org.testng.Assert.fail;
50
51 public class TestTSOClientConnectionToTSO {
52
53 private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
54
55
56 private static final String TSO_HOST = "localhost";
57 private static final String CURRENT_TSO_PATH = "/current_tso_path";
58 private static final String TSO_LEASE_PATH = "/tso_lease_path";
59
60 private int tsoPortForTest;
61 private String zkClusterForTest;
62
63 private Injector injector = null;
64
65 private TestingServer zkServer;
66
67 private CuratorFramework zkClient;
68 private TSOServer tsoServer;
69
70 @BeforeMethod
71 public void beforeMethod() throws Exception {
72
73 tsoPortForTest = TestUtils.getFreeLocalPort();
74
75 int zkPortForTest = TestUtils.getFreeLocalPort();
76 zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
77 LOG.info("Starting ZK Server in port {}", zkPortForTest);
78 zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
79 LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
80
81 zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
82
83 Stat stat;
84 try {
85 zkClient.delete().forPath(CURRENT_TSO_PATH);
86 stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
87 assertNull(stat, CURRENT_TSO_PATH + " should not exist");
88 } catch (NoNodeException e) {
89 LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
90 }
91
92 }
93
94 @AfterMethod
95 public void afterMethod() {
96
97 zkClient.close();
98
99 CloseableUtils.closeQuietly(zkServer);
100 zkServer = null;
101 LOG.info("ZK Server Stopped");
102
103 }
104
105 @Test(timeOut = 30_000)
106 public void testUnsuccessfulConnectionToTSO() throws Exception {
107
108
109
110 try {
111 TSOClient.newInstance(new OmidClientConfiguration());
112 } catch (IllegalArgumentException e) {
113
114 }
115
116 }
117
118 @Test(timeOut = 30_000)
119 public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
120
121
122 TSOServerConfig tsoConfig = new TSOServerConfig();
123 tsoConfig.setConflictMapSize(1000);
124 tsoConfig.setPort(tsoPortForTest);
125 tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
126 tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
127 injector = Guice.createInjector(new TSOMockModule(tsoConfig));
128 LOG.info("Starting TSO");
129 tsoServer = injector.getInstance(TSOServer.class);
130 tsoServer.startAsync();
131 tsoServer.awaitRunning();
132 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
133 LOG.info("Finished loading TSO");
134
135
136
137 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
138 tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
139 tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
140 TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
141
142
143 Long startTS = tsoClient.getNewStartTimestamp().get();
144 LOG.info("Start TS {} ", startTS);
145 assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
146
147
148 tsoClient.close().get();
149 tsoServer.stopAsync();
150 tsoServer.awaitTerminated();
151 tsoServer = null;
152 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
153 LOG.info("TSO Server Stopped");
154
155 }
156
157 @Test(timeOut = 30_000)
158 public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
159
160
161 TSOServerConfig config = new TSOServerConfig();
162 config.setConflictMapSize(1000);
163 config.setPort(tsoPortForTest);
164 config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
165 injector = Guice.createInjector(new TSOMockModule(config));
166 LOG.info("Starting TSO");
167 tsoServer = injector.getInstance(TSOServer.class);
168 tsoServer.startAsync();
169 tsoServer.awaitRunning();
170 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
171 LOG.info("Finished loading TSO");
172
173 waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
174
175
176 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
177 tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
178 tsoClientConf.setConnectionString(zkClusterForTest);
179 tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
180 TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
181
182
183 Long startTS = tsoClient.getNewStartTimestamp().get();
184 LOG.info("Start TS {} ", startTS);
185 assertTrue(startTS.longValue() >= CommitTable.MAX_CHECKPOINTS_PER_TXN);
186
187
188 tsoClient.close().get();
189 tsoServer.stopAsync();
190 tsoServer.awaitTerminated();
191 tsoServer = null;
192 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
193 LOG.info("TSO Server Stopped");
194
195 }
196
197 @Test(timeOut = 30_000)
198 public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
199
200
201 TSOServerConfig config = new TSOServerConfig();
202 config.setConflictMapSize(1000);
203 config.setPort(tsoPortForTest);
204 config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
205 injector = Guice.createInjector(new TSOMockModule(config));
206 LOG.info("Starting Initial TSO");
207 tsoServer = injector.getInstance(TSOServer.class);
208 tsoServer.startAsync();
209 tsoServer.awaitRunning();
210 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
211 LOG.info("Finished loading TSO");
212
213 waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
214
215
216 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
217 tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
218 tsoClientConf.setConnectionString(zkClusterForTest);
219 tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
220 TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
221
222
223 Long startTS = tsoClient.getNewStartTimestamp().get();
224 LOG.info("Start TS {} ", startTS);
225 assertTrue(startTS.longValue() >= CommitTable.MAX_CHECKPOINTS_PER_TXN);
226
227
228 tsoServer.stopAsync();
229 tsoServer.awaitTerminated();
230 tsoServer = null;
231 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
232 LOG.info("Initial TSO Server Stopped");
233
234 Thread.sleep(1500);
235
236 try {
237 startTS = tsoClient.getNewStartTimestamp().get();
238 fail();
239 } catch (ExecutionException e) {
240 LOG.info("Exception expected");
241
242 FsmImpl fsm = (FsmImpl) tsoClient.fsm;
243 assertEquals(e.getCause().getClass(), ConnectionException.class);
244 assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
245 ||
246 fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
247 }
248
249
250 Injector newInjector = Guice.createInjector(new TSOMockModule(config));
251 LOG.info("Re-Starting again the TSO");
252 tsoServer = newInjector.getInstance(TSOServer.class);
253 tsoServer.startAsync();
254 tsoServer.awaitRunning();
255 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
256 LOG.info("Finished loading restarted TSO");
257
258
259 boolean reconnectionActive = false;
260 while (!reconnectionActive) {
261 try {
262 startTS = tsoClient.getNewStartTimestamp().get();
263 reconnectionActive = true;
264 } catch (ExecutionException e) {
265
266 }
267 }
268 assertNotNull(startTS);
269
270
271 tsoServer.stopAsync();
272 tsoServer.awaitTerminated();
273 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
274 LOG.info("Restarted TSO Server Stopped");
275 }
276
277 private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
278 while (true) {
279 try {
280 Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
281 if (stat == null) {
282 continue;
283 }
284 LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
285 if (stat.toString().length() == 0) {
286 continue;
287 }
288 return;
289 } catch (Exception e) {
290 LOG.debug("TSO still has not registered yet, sleeping...", e);
291 Thread.sleep(500);
292 }
293 }
294 }
295
296 }