1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import com.google.inject.Guice;
21 import com.google.inject.Injector;
22
23 import org.apache.curator.framework.CuratorFramework;
24 import org.apache.curator.test.TestingServer;
25 import org.apache.curator.utils.CloseableUtils;
26 import org.apache.omid.TestUtils;
27 import org.apache.omid.transaction.AbstractTransactionManager;
28 import org.apache.omid.tso.HALeaseManagementModule;
29 import org.apache.omid.tso.TSOMockModule;
30 import org.apache.omid.tso.TSOServer;
31 import org.apache.omid.tso.TSOServerConfig;
32 import org.apache.omid.tso.VoidLeaseManagementModule;
33 import org.apache.statemachine.StateMachine.FsmImpl;
34 import org.apache.zookeeper.KeeperException.NoNodeException;
35 import org.apache.zookeeper.data.Stat;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.testng.annotations.AfterMethod;
39 import org.testng.annotations.BeforeMethod;
40 import org.testng.annotations.Test;
41
42 import java.util.concurrent.ExecutionException;
43
44 import static org.testng.Assert.assertEquals;
45 import static org.testng.Assert.assertNotNull;
46 import static org.testng.Assert.assertNull;
47 import static org.testng.Assert.assertTrue;
48 import static org.testng.Assert.fail;
49
50 public class TestTSOClientConnectionToTSO {
51
52 private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
53
54
55 private static final String TSO_HOST = "localhost";
56 private static final String CURRENT_TSO_PATH = "/current_tso_path";
57 private static final String TSO_LEASE_PATH = "/tso_lease_path";
58
59 private int tsoPortForTest;
60 private String zkClusterForTest;
61
62 private Injector injector = null;
63
64 private TestingServer zkServer;
65
66 private CuratorFramework zkClient;
67 private TSOServer tsoServer;
68
69 @BeforeMethod
70 public void beforeMethod() throws Exception {
71
72 tsoPortForTest = TestUtils.getFreeLocalPort();
73
74 int zkPortForTest = TestUtils.getFreeLocalPort();
75 zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
76 LOG.info("Starting ZK Server in port {}", zkPortForTest);
77 zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
78 LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
79
80 zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
81
82 Stat stat;
83 try {
84 zkClient.delete().forPath(CURRENT_TSO_PATH);
85 stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
86 assertNull(stat, CURRENT_TSO_PATH + " should not exist");
87 } catch (NoNodeException e) {
88 LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
89 }
90
91 }
92
93 @AfterMethod
94 public void afterMethod() {
95
96 zkClient.close();
97
98 CloseableUtils.closeQuietly(zkServer);
99 zkServer = null;
100 LOG.info("ZK Server Stopped");
101
102 }
103
104 @Test(timeOut = 30_000)
105 public void testUnsuccessfulConnectionToTSO() throws Exception {
106
107
108
109 try {
110 TSOClient.newInstance(new OmidClientConfiguration());
111 } catch (IllegalArgumentException e) {
112
113 }
114
115 }
116
117 @Test(timeOut = 30_000)
118 public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
119
120
121 TSOServerConfig tsoConfig = new TSOServerConfig();
122 tsoConfig.setConflictMapSize(1000);
123 tsoConfig.setPort(tsoPortForTest);
124 tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
125 injector = Guice.createInjector(new TSOMockModule(tsoConfig));
126 LOG.info("Starting TSO");
127 tsoServer = injector.getInstance(TSOServer.class);
128 tsoServer.startAndWait();
129 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
130 LOG.info("Finished loading TSO");
131
132
133
134 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
135 tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
136 tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
137 TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
138
139
140 Long startTS = tsoClient.getNewStartTimestamp().get();
141 LOG.info("Start TS {} ", startTS);
142 assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
143
144
145 tsoClient.close().get();
146 tsoServer.stopAndWait();
147 tsoServer = null;
148 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
149 LOG.info("TSO Server Stopped");
150
151 }
152
153 @Test(timeOut = 30_000)
154 public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
155
156
157 TSOServerConfig config = new TSOServerConfig();
158 config.setConflictMapSize(1000);
159 config.setPort(tsoPortForTest);
160 config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
161 injector = Guice.createInjector(new TSOMockModule(config));
162 LOG.info("Starting TSO");
163 tsoServer = injector.getInstance(TSOServer.class);
164 tsoServer.startAndWait();
165 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
166 LOG.info("Finished loading TSO");
167
168 waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
169
170
171 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
172 tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
173 tsoClientConf.setConnectionString(zkClusterForTest);
174 tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
175 TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
176
177
178 Long startTS = tsoClient.getNewStartTimestamp().get();
179 LOG.info("Start TS {} ", startTS);
180 assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
181
182
183 tsoClient.close().get();
184 tsoServer.stopAndWait();
185 tsoServer = null;
186 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
187 LOG.info("TSO Server Stopped");
188
189 }
190
191 @Test(timeOut = 30_000)
192 public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
193
194
195 TSOServerConfig config = new TSOServerConfig();
196 config.setConflictMapSize(1000);
197 config.setPort(tsoPortForTest);
198 config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
199 injector = Guice.createInjector(new TSOMockModule(config));
200 LOG.info("Starting Initial TSO");
201 tsoServer = injector.getInstance(TSOServer.class);
202 tsoServer.startAndWait();
203 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
204 LOG.info("Finished loading TSO");
205
206 waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
207
208
209 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
210 tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
211 tsoClientConf.setConnectionString(zkClusterForTest);
212 tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
213 TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
214
215
216 Long startTS = tsoClient.getNewStartTimestamp().get();
217 LOG.info("Start TS {} ", startTS);
218 assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
219
220
221 tsoServer.stopAndWait();
222 tsoServer = null;
223 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
224 LOG.info("Initial TSO Server Stopped");
225
226 Thread.sleep(1500);
227
228 try {
229 startTS = tsoClient.getNewStartTimestamp().get();
230 fail();
231 } catch (ExecutionException e) {
232 LOG.info("Exception expected");
233
234 FsmImpl fsm = (FsmImpl) tsoClient.fsm;
235 assertEquals(e.getCause().getClass(), ConnectionException.class);
236 assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
237 ||
238 fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
239 }
240
241
242 Injector newInjector = Guice.createInjector(new TSOMockModule(config));
243 LOG.info("Re-Starting again the TSO");
244 tsoServer = newInjector.getInstance(TSOServer.class);
245 tsoServer.startAndWait();
246 TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
247 LOG.info("Finished loading restarted TSO");
248
249
250 boolean reconnectionActive = false;
251 while (!reconnectionActive) {
252 try {
253 startTS = tsoClient.getNewStartTimestamp().get();
254 reconnectionActive = true;
255 } catch (ExecutionException e) {
256
257 }
258 }
259 assertNotNull(startTS);
260
261
262 tsoServer.stopAndWait();
263 TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
264 LOG.info("Restarted TSO Server Stopped");
265 }
266
267 private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
268 while (true) {
269 try {
270 Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
271 if (stat == null) {
272 continue;
273 }
274 LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
275 if (stat.toString().length() == 0) {
276 continue;
277 }
278 return;
279 } catch (Exception e) {
280 LOG.debug("TSO still has not registered yet, sleeping...", e);
281 Thread.sleep(500);
282 }
283 }
284 }
285
286 }