View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.inject.Guice;
21  import com.google.inject.Injector;
22  
23  import org.apache.curator.framework.CuratorFramework;
24  import org.apache.curator.test.TestingServer;
25  import org.apache.curator.utils.CloseableUtils;
26  import org.apache.omid.TestUtils;
27  import org.apache.omid.transaction.AbstractTransactionManager;
28  import org.apache.omid.tso.HALeaseManagementModule;
29  import org.apache.omid.tso.TSOMockModule;
30  import org.apache.omid.tso.TSOServer;
31  import org.apache.omid.tso.TSOServerConfig;
32  import org.apache.omid.tso.VoidLeaseManagementModule;
33  import org.apache.statemachine.StateMachine.FsmImpl;
34  import org.apache.zookeeper.KeeperException.NoNodeException;
35  import org.apache.zookeeper.data.Stat;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  import org.testng.annotations.AfterMethod;
39  import org.testng.annotations.BeforeMethod;
40  import org.testng.annotations.Test;
41  
42  import java.util.concurrent.ExecutionException;
43  
44  import static org.testng.Assert.assertEquals;
45  import static org.testng.Assert.assertNotNull;
46  import static org.testng.Assert.assertNull;
47  import static org.testng.Assert.assertTrue;
48  import static org.testng.Assert.fail;
49  
50  public class TestTSOClientConnectionToTSO {
51  
52      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
53  
54      // Constants and variables for component connectivity
55      private static final String TSO_HOST = "localhost";
56      private static final String CURRENT_TSO_PATH = "/current_tso_path";
57      private static final String TSO_LEASE_PATH = "/tso_lease_path";
58  
59      private int tsoPortForTest;
60      private String zkClusterForTest;
61  
62      private Injector injector = null;
63  
64      private TestingServer zkServer;
65  
66      private CuratorFramework zkClient;
67      private TSOServer tsoServer;
68  
69      @BeforeMethod
70      public void beforeMethod() throws Exception {
71  
72          tsoPortForTest = TestUtils.getFreeLocalPort();
73  
74          int zkPortForTest = TestUtils.getFreeLocalPort();
75          zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
76          LOG.info("Starting ZK Server in port {}", zkPortForTest);
77          zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
78          LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
79  
80          zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
81  
82          Stat stat;
83          try {
84              zkClient.delete().forPath(CURRENT_TSO_PATH);
85              stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
86              assertNull(stat, CURRENT_TSO_PATH + " should not exist");
87          } catch (NoNodeException e) {
88              LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
89          }
90  
91      }
92  
93      @AfterMethod
94      public void afterMethod() {
95  
96          zkClient.close();
97  
98          CloseableUtils.closeQuietly(zkServer);
99          zkServer = null;
100         LOG.info("ZK Server Stopped");
101 
102     }
103 
104     @Test(timeOut = 30_000)
105     public void testUnsuccessfulConnectionToTSO() throws Exception {
106 
107         // When no HA node for TSOServer is found & no host:port config exists
108         // we should get an exception when getting the client
109         try {
110             TSOClient.newInstance(new OmidClientConfiguration());
111         } catch (IllegalArgumentException e) {
112             // Expected
113         }
114 
115     }
116 
117     @Test(timeOut = 30_000)
118     public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
119 
120         // Launch a TSO WITHOUT publishing the address in HA...
121         TSOServerConfig tsoConfig = new TSOServerConfig();
122         tsoConfig.setConflictMapSize(1000);
123         tsoConfig.setPort(tsoPortForTest);
124         tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
125         injector = Guice.createInjector(new TSOMockModule(tsoConfig));
126         LOG.info("Starting TSO");
127         tsoServer = injector.getInstance(TSOServer.class);
128         tsoServer.startAndWait();
129         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
130         LOG.info("Finished loading TSO");
131 
132         // When no HA node for TSOServer is found we should get a connection
133         // to the TSO through the host:port configured...
134         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
135         tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
136         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
137         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
138 
139         // ... so we should get responses from the methods
140         Long startTS = tsoClient.getNewStartTimestamp().get();
141         LOG.info("Start TS {} ", startTS);
142         assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
143 
144         // Close the tsoClient connection and stop the TSO Server
145         tsoClient.close().get();
146         tsoServer.stopAndWait();
147         tsoServer = null;
148         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
149         LOG.info("TSO Server Stopped");
150 
151     }
152 
153     @Test(timeOut = 30_000)
154     public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
155 
156         // Launch a TSO publishing the address in HA...
157         TSOServerConfig config = new TSOServerConfig();
158         config.setConflictMapSize(1000);
159         config.setPort(tsoPortForTest);
160         config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
161         injector = Guice.createInjector(new TSOMockModule(config));
162         LOG.info("Starting TSO");
163         tsoServer = injector.getInstance(TSOServer.class);
164         tsoServer.startAndWait();
165         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
166         LOG.info("Finished loading TSO");
167 
168         waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
169 
170         // When a HA node for TSOServer is found we should get a connection
171         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
172         tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
173         tsoClientConf.setConnectionString(zkClusterForTest);
174         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
175         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
176 
177         // ... so we should get responses from the methods
178         Long startTS = tsoClient.getNewStartTimestamp().get();
179         LOG.info("Start TS {} ", startTS);
180         assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
181 
182         // Close the tsoClient connection and stop the TSO Server
183         tsoClient.close().get();
184         tsoServer.stopAndWait();
185         tsoServer = null;
186         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
187         LOG.info("TSO Server Stopped");
188 
189     }
190 
191     @Test(timeOut = 30_000)
192     public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
193 
194         // Start a TSO with HA...
195         TSOServerConfig config = new TSOServerConfig();
196         config.setConflictMapSize(1000);
197         config.setPort(tsoPortForTest);
198         config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
199         injector = Guice.createInjector(new TSOMockModule(config));
200         LOG.info("Starting Initial TSO");
201         tsoServer = injector.getInstance(TSOServer.class);
202         tsoServer.startAndWait();
203         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
204         LOG.info("Finished loading TSO");
205 
206         waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
207 
208         // Then create the TSO Client under test...
209         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
210         tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
211         tsoClientConf.setConnectionString(zkClusterForTest);
212         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
213         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
214 
215         // ... and check that initially we get responses from the methods
216         Long startTS = tsoClient.getNewStartTimestamp().get();
217         LOG.info("Start TS {} ", startTS);
218         assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
219 
220         // Then stop the server...
221         tsoServer.stopAndWait();
222         tsoServer = null;
223         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
224         LOG.info("Initial TSO Server Stopped");
225 
226         Thread.sleep(1500); // ...allow the client to receive disconnection event...
227         // ... and check that we get a conn exception when trying to access the client
228         try {
229             startTS = tsoClient.getNewStartTimestamp().get();
230             fail();
231         } catch (ExecutionException e) {
232             LOG.info("Exception expected");
233             // Internal accessor to fsm to do the required checkings
234             FsmImpl fsm = (FsmImpl) tsoClient.fsm;
235             assertEquals(e.getCause().getClass(), ConnectionException.class);
236             assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
237                                ||
238                                fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
239         }
240 
241         // After that, simulate that a new TSO has been launched...
242         Injector newInjector = Guice.createInjector(new TSOMockModule(config));
243         LOG.info("Re-Starting again the TSO");
244         tsoServer = newInjector.getInstance(TSOServer.class);
245         tsoServer.startAndWait();
246         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
247         LOG.info("Finished loading restarted TSO");
248 
249         // Finally re-check that, eventually, we can get a new value from the new TSO...
250         boolean reconnectionActive = false;
251         while (!reconnectionActive) {
252             try {
253                 startTS = tsoClient.getNewStartTimestamp().get();
254                 reconnectionActive = true;
255             } catch (ExecutionException e) {
256                 // Expected
257             }
258         }
259         assertNotNull(startTS);
260 
261         // ...and stop the server
262         tsoServer.stopAndWait();
263         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
264         LOG.info("Restarted TSO Server Stopped");
265     }
266 
267     private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
268         while (true) {
269             try {
270                 Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
271                 if (stat == null) {
272                     continue;
273                 }
274                 LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
275                 if (stat.toString().length() == 0) {
276                     continue;
277                 }
278                 return;
279             } catch (Exception e) {
280                 LOG.debug("TSO still has not registered yet, sleeping...", e);
281                 Thread.sleep(500);
282             }
283         }
284     }
285 
286 }