View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.inject.Guice;
21  import com.google.inject.Injector;
22  
23  import org.apache.curator.framework.CuratorFramework;
24  import org.apache.curator.test.TestingServer;
25  import org.apache.curator.utils.CloseableUtils;
26  import org.apache.omid.TestUtils;
27  import org.apache.omid.committable.CommitTable;
28  import org.apache.omid.tso.HALeaseManagementModule;
29  import org.apache.omid.tso.TSOMockModule;
30  import org.apache.omid.tso.TSOServer;
31  import org.apache.omid.tso.TSOServerConfig;
32  import org.apache.omid.tso.VoidLeaseManagementModule;
33  import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
34  import org.apache.statemachine.StateMachine.FsmImpl;
35  import org.apache.zookeeper.KeeperException.NoNodeException;
36  import org.apache.zookeeper.data.Stat;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  import org.testng.annotations.AfterMethod;
40  import org.testng.annotations.BeforeMethod;
41  import org.testng.annotations.Test;
42  
43  import java.util.concurrent.ExecutionException;
44  
45  import static org.testng.Assert.assertEquals;
46  import static org.testng.Assert.assertNotNull;
47  import static org.testng.Assert.assertNull;
48  import static org.testng.Assert.assertTrue;
49  import static org.testng.Assert.fail;
50  
51  public class TestTSOClientConnectionToTSO {
52  
53      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
54  
55      // Constants and variables for component connectivity
56      private static final String TSO_HOST = "localhost";
57      private static final String CURRENT_TSO_PATH = "/current_tso_path";
58      private static final String TSO_LEASE_PATH = "/tso_lease_path";
59  
60      private int tsoPortForTest;
61      private String zkClusterForTest;
62  
63      private Injector injector = null;
64  
65      private TestingServer zkServer;
66  
67      private CuratorFramework zkClient;
68      private TSOServer tsoServer;
69  
70      @BeforeMethod
71      public void beforeMethod() throws Exception {
72  
73          tsoPortForTest = TestUtils.getFreeLocalPort();
74  
75          int zkPortForTest = TestUtils.getFreeLocalPort();
76          zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
77          LOG.info("Starting ZK Server in port {}", zkPortForTest);
78          zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
79          LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
80  
81          zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
82  
83          Stat stat;
84          try {
85              zkClient.delete().forPath(CURRENT_TSO_PATH);
86              stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
87              assertNull(stat, CURRENT_TSO_PATH + " should not exist");
88          } catch (NoNodeException e) {
89              LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
90          }
91  
92      }
93  
94      @AfterMethod
95      public void afterMethod() {
96  
97          zkClient.close();
98  
99          CloseableUtils.closeQuietly(zkServer);
100         zkServer = null;
101         LOG.info("ZK Server Stopped");
102 
103     }
104 
105     @Test(timeOut = 30_000)
106     public void testUnsuccessfulConnectionToTSO() throws Exception {
107 
108         // When no HA node for TSOServer is found & no host:port config exists
109         // we should get an exception when getting the client
110         try {
111             TSOClient.newInstance(new OmidClientConfiguration());
112         } catch (IllegalArgumentException e) {
113             // Expected
114         }
115 
116     }
117 
118     @Test(timeOut = 30_000)
119     public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
120 
121         // Launch a TSO WITHOUT publishing the address in HA...
122         TSOServerConfig tsoConfig = new TSOServerConfig();
123         tsoConfig.setConflictMapSize(1000);
124         tsoConfig.setPort(tsoPortForTest);
125         tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
126         tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
127         injector = Guice.createInjector(new TSOMockModule(tsoConfig));
128         LOG.info("Starting TSO");
129         tsoServer = injector.getInstance(TSOServer.class);
130         tsoServer.startAsync();
131         tsoServer.awaitRunning();
132         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
133         LOG.info("Finished loading TSO");
134 
135         // When no HA node for TSOServer is found we should get a connection
136         // to the TSO through the host:port configured...
137         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
138         tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
139         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
140         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
141 
142         // ... so we should get responses from the methods
143         Long startTS = tsoClient.getNewStartTimestamp().get();
144         LOG.info("Start TS {} ", startTS);
145         assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
146 
147         // Close the tsoClient connection and stop the TSO Server
148         tsoClient.close().get();
149         tsoServer.stopAsync();
150         tsoServer.awaitTerminated();
151         tsoServer = null;
152         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
153         LOG.info("TSO Server Stopped");
154 
155     }
156 
157     @Test(timeOut = 30_000)
158     public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
159 
160         // Launch a TSO publishing the address in HA...
161         TSOServerConfig config = new TSOServerConfig();
162         config.setConflictMapSize(1000);
163         config.setPort(tsoPortForTest);
164         config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
165         injector = Guice.createInjector(new TSOMockModule(config));
166         LOG.info("Starting TSO");
167         tsoServer = injector.getInstance(TSOServer.class);
168         tsoServer.startAsync();
169         tsoServer.awaitRunning();
170         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
171         LOG.info("Finished loading TSO");
172 
173         waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
174 
175         // When a HA node for TSOServer is found we should get a connection
176         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
177         tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
178         tsoClientConf.setConnectionString(zkClusterForTest);
179         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
180         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
181 
182         // ... so we should get responses from the methods
183         Long startTS = tsoClient.getNewStartTimestamp().get();
184         LOG.info("Start TS {} ", startTS);
185         assertTrue(startTS.longValue() >= CommitTable.MAX_CHECKPOINTS_PER_TXN);
186 
187         // Close the tsoClient connection and stop the TSO Server
188         tsoClient.close().get();
189         tsoServer.stopAsync();
190         tsoServer.awaitTerminated();
191         tsoServer = null;
192         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
193         LOG.info("TSO Server Stopped");
194 
195     }
196 
197     @Test(timeOut = 30_000)
198     public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
199 
200         // Start a TSO with HA...
201         TSOServerConfig config = new TSOServerConfig();
202         config.setConflictMapSize(1000);
203         config.setPort(tsoPortForTest);
204         config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
205         injector = Guice.createInjector(new TSOMockModule(config));
206         LOG.info("Starting Initial TSO");
207         tsoServer = injector.getInstance(TSOServer.class);
208         tsoServer.startAsync();
209         tsoServer.awaitRunning();
210         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
211         LOG.info("Finished loading TSO");
212 
213         waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
214 
215         // Then create the TSO Client under test...
216         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
217         tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
218         tsoClientConf.setConnectionString(zkClusterForTest);
219         tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
220         TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
221 
222         // ... and check that initially we get responses from the methods
223         Long startTS = tsoClient.getNewStartTimestamp().get();
224         LOG.info("Start TS {} ", startTS);
225         assertTrue(startTS.longValue() >= CommitTable.MAX_CHECKPOINTS_PER_TXN);
226 
227         // Then stop the server...
228         tsoServer.stopAsync();
229         tsoServer.awaitTerminated();
230         tsoServer = null;
231         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
232         LOG.info("Initial TSO Server Stopped");
233 
234         Thread.sleep(1500); // ...allow the client to receive disconnection event...
235         // ... and check that we get a conn exception when trying to access the client
236         try {
237             startTS = tsoClient.getNewStartTimestamp().get();
238             fail();
239         } catch (ExecutionException e) {
240             LOG.info("Exception expected");
241             // Internal accessor to fsm to do the required checkings
242             FsmImpl fsm = (FsmImpl) tsoClient.fsm;
243             assertEquals(e.getCause().getClass(), ConnectionException.class);
244             assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
245                                ||
246                                fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
247         }
248 
249         // After that, simulate that a new TSO has been launched...
250         Injector newInjector = Guice.createInjector(new TSOMockModule(config));
251         LOG.info("Re-Starting again the TSO");
252         tsoServer = newInjector.getInstance(TSOServer.class);
253         tsoServer.startAsync();
254         tsoServer.awaitRunning();
255         TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
256         LOG.info("Finished loading restarted TSO");
257 
258         // Finally re-check that, eventually, we can get a new value from the new TSO...
259         boolean reconnectionActive = false;
260         while (!reconnectionActive) {
261             try {
262                 startTS = tsoClient.getNewStartTimestamp().get();
263                 reconnectionActive = true;
264             } catch (ExecutionException e) {
265                 // Expected
266             }
267         }
268         assertNotNull(startTS);
269 
270         // ...and stop the server
271         tsoServer.stopAsync();
272         tsoServer.awaitTerminated();
273         TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
274         LOG.info("Restarted TSO Server Stopped");
275     }
276 
277     private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
278         while (true) {
279             try {
280                 Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
281                 if (stat == null) {
282                     continue;
283                 }
284                 LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
285                 if (stat.toString().length() == 0) {
286                     continue;
287                 }
288                 return;
289             } catch (Exception e) {
290                 LOG.debug("TSO still has not registered yet, sleeping...", e);
291                 Thread.sleep(500);
292             }
293         }
294     }
295 
296 }