View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.inject.Guice;
21  import com.google.inject.Injector;
22  import org.apache.curator.framework.CuratorFramework;
23  import org.apache.curator.test.TestingServer;
24  import org.apache.curator.utils.CloseableUtils;
25  import org.apache.omid.TestUtils;
26  import org.apache.omid.tso.HALeaseManagementModule;
27  import org.apache.omid.tso.TSOMockModule;
28  import org.apache.omid.tso.TSOServer;
29  import org.apache.omid.tso.TSOServerConfig;
30  import org.apache.omid.tso.VoidLeaseManagementModule;
31  import org.apache.statemachine.StateMachine.FsmImpl;
32  import org.apache.zookeeper.KeeperException.NoNodeException;
33  import org.apache.zookeeper.data.Stat;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.testng.annotations.AfterMethod;
37  import org.testng.annotations.BeforeMethod;
38  import org.testng.annotations.Test;
39  
40  import java.util.concurrent.ExecutionException;
41  
42  import static org.testng.Assert.assertEquals;
43  import static org.testng.Assert.assertNotNull;
44  import static org.testng.Assert.assertNull;
45  import static org.testng.Assert.assertTrue;
46  import static org.testng.Assert.fail;
47  
48  public class TestTSOClientConnectionToTSO {
49  
50      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
51  
52      // Constants and variables for component connectivity
53      private static final String TSO_HOST = "localhost";
54      private static final String CURRENT_TSO_PATH = "/current_tso_path";
55      private static final String TSO_LEASE_PATH = "/tso_lease_path";
56  
57      private int tsoPortForTest;
58      private String zkClusterForTest;
59  
60      private Injector injector = null;
61  
62      private TestingServer zkServer;
63  
64      private CuratorFramework zkClient;
65      private TSOServer tsoServer;
66  
67      @BeforeMethod
68      public void beforeMethod() throws Exception {
69  
70          tsoPortForTest = TestUtils.getFreeLocalPort();
71  
72          int zkPortForTest = TestUtils.getFreeLocalPort();
73          zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
74          LOG.info("Starting ZK Server in port {}", zkPortForTest);
75          zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
76          LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
77  
78          zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
79  
80          Stat stat;
81          try {
82              zkClient.delete().forPath(CURRENT_TSO_PATH);
83              stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
84              assertNull(stat, CURRENT_TSO_PATH + " should not exist");
85          } catch (NoNodeException e) {
86              LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
87          }
88  
89      }
90  
91      @AfterMethod
92      public void afterMethod() {
93  
94          zkClient.close();
95  
96          CloseableUtils.closeQuietly(zkServer);
97          zkServer = null;
98          LOG.info("ZK Server Stopped");
99  
100     }
101 
102     @Test(timeOut = 30_000)
103     public void testUnsuccessfulConnectionToTSO() throws Exception {
104 
105         // When no HA node for TSOServer is found & no host:port config exists
106         // we should get an exception when getting the client
107         try {
108             TSOClient.newInstance(new OmidClientConfiguration());
109         } catch (IllegalArgumentException e) {
110             // Expected
111         }
112 
113     }
114 
115     @Test(timeOut = 30_000)
116     public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
117 
118         // Launch a TSO WITHOUT publishing the address in HA...
119         TSOServerConfig tsoConfig = new TSOServerConfig();
120         tsoConfig.setMaxItems(1000);
121         tsoConfig.setPort(tsoPortForTest);
122         tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
123         injector = Guice.createInjector(new TSOMockModule(tsoConfig));
124         LOG.info("Starting TSO");
125         tsoServer = injector.getInstance(TSOServer.class);
126         tsoServer.startAndWait();
127         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
128         LOG.info("Finished loading TSO");
129 
130         // When no HA node for TSOServer is found we should get a connection
131         // to the TSO through the host:port configured...
132         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
133         tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
134         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
135         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
136 
137         // ... so we should get responses from the methods
138         Long startTS = tsoClient.getNewStartTimestamp().get();
139         LOG.info("Start TS {} ", startTS);
140         assertEquals(startTS.longValue(), 1);
141 
142         // Close the tsoClient connection and stop the TSO Server
143         tsoClient.close().get();
144         tsoServer.stopAndWait();
145         tsoServer = null;
146         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
147         LOG.info("TSO Server Stopped");
148 
149     }
150 
151     @Test(timeOut = 30_000)
152     public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
153 
154         // Launch a TSO publishing the address in HA...
155         TSOServerConfig config = new TSOServerConfig();
156         config.setMaxItems(1000);
157         config.setPort(tsoPortForTest);
158         config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
159         injector = Guice.createInjector(new TSOMockModule(config));
160         LOG.info("Starting TSO");
161         tsoServer = injector.getInstance(TSOServer.class);
162         tsoServer.startAndWait();
163         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
164         LOG.info("Finished loading TSO");
165 
166         waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
167 
168         // When a HA node for TSOServer is found we should get a connection
169         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
170         tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
171         tsoClientConf.setConnectionString(zkClusterForTest);
172         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
173         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
174 
175         // ... so we should get responses from the methods
176         Long startTS = tsoClient.getNewStartTimestamp().get();
177         LOG.info("Start TS {} ", startTS);
178         assertEquals(startTS.longValue(), 1);
179 
180         // Close the tsoClient connection and stop the TSO Server
181         tsoClient.close().get();
182         tsoServer.stopAndWait();
183         tsoServer = null;
184         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
185         LOG.info("TSO Server Stopped");
186 
187     }
188 
189     @Test(timeOut = 30_000)
190     public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
191 
192         // Start a TSO with HA...
193         TSOServerConfig config = new TSOServerConfig();
194         config.setMaxItems(1000);
195         config.setPort(tsoPortForTest);
196         config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
197         injector = Guice.createInjector(new TSOMockModule(config));
198         LOG.info("Starting Initial TSO");
199         tsoServer = injector.getInstance(TSOServer.class);
200         tsoServer.startAndWait();
201         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
202         LOG.info("Finished loading TSO");
203 
204         waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
205 
206         // Then create the TSO Client under test...
207         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
208         tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
209         tsoClientConf.setConnectionString(zkClusterForTest);
210         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
211         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
212 
213         // ... and check that initially we get responses from the methods
214         Long startTS = tsoClient.getNewStartTimestamp().get();
215         LOG.info("Start TS {} ", startTS);
216         assertEquals(startTS.longValue(), 1);
217 
218         // Then stop the server...
219         tsoServer.stopAndWait();
220         tsoServer = null;
221         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
222         LOG.info("Initial TSO Server Stopped");
223 
224         Thread.sleep(1500); // ...allow the client to receive disconnection event...
225         // ... and check that we get a conn exception when trying to access the client
226         try {
227             startTS = tsoClient.getNewStartTimestamp().get();
228             fail();
229         } catch (ExecutionException e) {
230             LOG.info("Exception expected");
231             // Internal accessor to fsm to do the required checkings
232             FsmImpl fsm = (FsmImpl) tsoClient.fsm;
233             assertEquals(e.getCause().getClass(), ConnectionException.class);
234             assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
235                                ||
236                                fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
237         }
238 
239         // After that, simulate that a new TSO has been launched...
240         Injector newInjector = Guice.createInjector(new TSOMockModule(config));
241         LOG.info("Re-Starting again the TSO");
242         tsoServer = newInjector.getInstance(TSOServer.class);
243         tsoServer.startAndWait();
244         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
245         LOG.info("Finished loading restarted TSO");
246 
247         // Finally re-check that, eventually, we can get a new value from the new TSO...
248         boolean reconnectionActive = false;
249         while (!reconnectionActive) {
250             try {
251                 startTS = tsoClient.getNewStartTimestamp().get();
252                 reconnectionActive = true;
253             } catch (ExecutionException e) {
254                 // Expected
255             }
256         }
257         assertNotNull(startTS);
258 
259         // ...and stop the server
260         tsoServer.stopAndWait();
261         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
262         LOG.info("Restarted TSO Server Stopped");
263     }
264 
265     private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
266         while (true) {
267             try {
268                 Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
269                 if (stat == null) {
270                     continue;
271                 }
272                 LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
273                 if (stat.toString().length() == 0) {
274                     continue;
275                 }
276                 return;
277             } catch (Exception e) {
278                 LOG.debug("TSO still has not registered yet, sleeping...", e);
279                 Thread.sleep(500);
280             }
281         }
282     }
283 
284 }