View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.base.Charsets;
21  import org.apache.curator.framework.CuratorFramework;
22  import org.apache.curator.test.TestingServer;
23  import org.apache.curator.utils.CloseableUtils;
24  import org.apache.omid.TestUtils;
25  import org.apache.omid.tso.TSOStateManager.TSOState;
26  import org.mockito.ArgumentCaptor;
27  import org.mockito.Mock;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.testng.annotations.AfterClass;
31  import org.testng.annotations.BeforeClass;
32  import org.testng.annotations.Test;
33  
34  import java.io.IOException;
35  
36  import static org.apache.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
37  import static org.mockito.Mockito.any;
38  import static org.mockito.Mockito.anyString;
39  import static org.mockito.Mockito.mock;
40  import static org.mockito.Mockito.reset;
41  import static org.mockito.Mockito.spy;
42  import static org.mockito.Mockito.timeout;
43  import static org.mockito.Mockito.times;
44  import static org.mockito.Mockito.verify;
45  import static org.mockito.Mockito.when;
46  import static org.testng.Assert.assertEquals;
47  import static org.testng.Assert.assertFalse;
48  import static org.testng.Assert.assertTrue;
49  
50  public class TestLeaseManager {
51  
52      private static final long DUMMY_EPOCH_1 = 1L;
53      private static final long DUMMY_EPOCH_2 = 2L;
54      private static final long DUMMY_EPOCH_3 = 3L;
55      private static final long DUMMY_LOW_WATERMARK_1 = DUMMY_EPOCH_1;
56      private static final long DUMMY_LOW_WATERMARK_2 = DUMMY_EPOCH_2;
57      private static final long DUMMY_LOW_WATERMARK_3 = DUMMY_EPOCH_3;
58  
59      private static final String LEASE_MGR_ID_1 = "LM1";
60      private static final String LEASE_MGR_ID_2 = "LM2";
61      private static final String INSTANCE_ID_1 = "LM1" + "#";
62      private static final String INSTANCE_ID_2 = "LM2" + "#";
63  
64      private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
65  
66      private static final long TEST_LEASE_PERIOD_IN_MS = 5000; // 5 seconds
67  
68      private CuratorFramework zkClient;
69      private TestingServer zkServer;
70  
71      @Mock
72      private Panicker panicker;
73  
74      private PausableLeaseManager leaseManager1;
75      private PausableLeaseManager leaseManager2;
76  
77      @BeforeClass
78      public void beforeClass() throws Exception {
79  
80          LOG.info("Starting ZK Server");
81          zkServer = TestUtils.provideTestingZKServer();
82          LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
83  
84          zkClient = TestUtils.provideConnectedZKClient(DEFAULT_ZK_CLUSTER);
85  
86      }
87  
88      @AfterClass
89      public void afterClass() throws Exception {
90  
91          zkClient.close();
92  
93          CloseableUtils.closeQuietly(zkServer);
94          zkServer = null;
95          LOG.info("ZK Server Stopped");
96  
97      }
98  
99      @Test(timeOut = 80_000)
100     public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
101 
102         final String TEST_TSO_LEASE_PATH = "/test0_tsolease";
103         final String TEST_CURRENT_TSO_PATH = "/test0_currenttso";
104 
105         Panicker panicker = spy(new MockPanicker());
106 
107         TSOChannelHandler tsoChannelHandler = mock(TSOChannelHandler.class);
108         TSOStateManager stateManager = mock(TSOStateManager.class);
109         when(stateManager.initialize()).thenThrow(new IOException());
110         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
111                                                  tsoChannelHandler,
112                                                  stateManager,
113                                                  TEST_LEASE_PERIOD_IN_MS,
114                                                  TEST_TSO_LEASE_PATH,
115                                                  TEST_CURRENT_TSO_PATH,
116                                                  zkClient,
117                                                  panicker);
118         leaseManager1.startService();
119 
120         // ... let the test run for some time...
121         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
122 
123         verify(panicker, timeout(2000).atLeastOnce()).panic(anyString(), any(IOException.class));
124 
125         leaseManager1.stopService();
126 
127     }
128 
129     @Test(timeOut = 80_000)
130     public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance() throws Exception {
131 
132         final String TEST_TSO_LEASE_PATH = "/test1_tsolease";
133         final String TEST_CURRENT_TSO_PATH = "/test1_currenttso";
134 
135         // Launch the instance under test...
136         TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
137         TSOStateManager stateManager1 = mock(TSOStateManager.class);
138         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
139         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
140                                                  tsoChannelHandler1,
141                                                  stateManager1,
142                                                  TEST_LEASE_PERIOD_IN_MS,
143                                                  TEST_TSO_LEASE_PATH,
144                                                  TEST_CURRENT_TSO_PATH,
145                                                  zkClient,
146                                                  panicker);
147         leaseManager1.startService();
148 
149         // ... let the test run for some time...
150         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
151 
152         // ... check is the lease holder
153         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
154         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
155         assertTrue(leaseManager1.stillInLeasePeriod());
156 
157         // Then, pause instance when trying to renew lease...
158         leaseManager1.pausedInTryToRenewLeasePeriod();
159 
160         // ...let the test run for some time...
161         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
162 
163         // ...check that nothing changed...
164         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
165         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
166 
167         // Finally, resume the instance...
168         leaseManager1.resume();
169 
170         // ... let the test run for some time...
171         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
172 
173         // ... and check again that nothing changed
174         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
175         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
176         assertFalse(leaseManager1.stillInLeasePeriod()); // Must not be master as it should have triggered and exception
177 
178     }
179 
180     @Test(timeOut = 80_000)
181     public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
182 
183         final String TEST_TSO_LEASE_PATH = "/test2_tsolease";
184         final String TEST_CURRENT_TSO_PATH = "/test2_currenttso";
185 
186         // Launch the master instance...
187         TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
188         TSOStateManager stateManager1 = mock(TSOStateManager.class);
189         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
190         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
191                                                  tsoChannelHandler1,
192                                                  stateManager1,
193                                                  TEST_LEASE_PERIOD_IN_MS,
194                                                  TEST_TSO_LEASE_PATH,
195                                                  TEST_CURRENT_TSO_PATH,
196                                                  zkClient,
197                                                  panicker);
198 
199         leaseManager1.startService();
200 
201         // ...let the test run for some time...
202         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
203 
204         // ...so it should be the current holder of the lease
205         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
206         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
207         assertTrue(leaseManager1.stillInLeasePeriod());
208 
209         // Then launch another instance...
210         TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
211         TSOStateManager stateManager2 = mock(TSOStateManager.class);
212         when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
213         leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
214                                                  tsoChannelHandler2,
215                                                  stateManager2,
216                                                  TEST_LEASE_PERIOD_IN_MS,
217                                                  TEST_TSO_LEASE_PATH,
218                                                  TEST_CURRENT_TSO_PATH,
219                                                  zkClient,
220                                                  panicker);
221         leaseManager2.startService();
222 
223         // ... let the test run for some time...
224         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
225 
226         // ... and after the period, the first instance should be still the holder
227         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
228         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
229         assertTrue(leaseManager1.stillInLeasePeriod());
230         assertFalse(leaseManager2.stillInLeasePeriod());
231     }
232 
233     @Test(timeOut = 80_000)
234     public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
235 
236         final String TEST_TSO_LEASE_PATH = "/test3_tsolease";
237         final String TEST_CURRENT_TSO_PATH = "/test3_currenttso";
238 
239         // Launch the master instance...
240         TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
241         TSOStateManager stateManager1 = mock(TSOStateManager.class);
242         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
243         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
244                                                  tsoChannelHandler1,
245                                                  stateManager1,
246                                                  TEST_LEASE_PERIOD_IN_MS,
247                                                  TEST_TSO_LEASE_PATH,
248                                                  TEST_CURRENT_TSO_PATH,
249                                                  zkClient,
250                                                  panicker);
251 
252         leaseManager1.startService();
253 
254         // ... let the test run for some time...
255         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
256 
257         // ... so it should be the current holder of the lease
258         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
259         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
260         assertTrue(leaseManager1.stillInLeasePeriod());
261 
262         // Then launch another instance...
263         TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
264         TSOStateManager stateManager2 = mock(TSOStateManager.class);
265         when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
266         leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
267                                                  tsoChannelHandler2,
268                                                  stateManager2,
269                                                  TEST_LEASE_PERIOD_IN_MS,
270                                                  TEST_TSO_LEASE_PATH,
271                                                  TEST_CURRENT_TSO_PATH,
272                                                  zkClient,
273                                                  panicker);
274         leaseManager2.startService();
275 
276         // ... and pause active lease manager...
277         leaseManager1.pausedInStillInLeasePeriod();
278 
279         // ... and let the test run for some time...
280         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
281 
282         // ... and check that lease owner should have changed to the second instance
283         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
284         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
285         assertTrue(leaseManager2.stillInLeasePeriod());
286 
287         // Now, lets resume the first instance...
288         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_3, DUMMY_EPOCH_3));
289         leaseManager1.resume();
290 
291         // ... let the test run for some time...
292         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
293 
294         // and check the lease owner is still the second instance (preserves the lease)
295         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
296         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
297         assertFalse(leaseManager1.stillInLeasePeriod());
298         assertTrue(leaseManager2.stillInLeasePeriod());
299 
300         // Finally, pause active lease manager when trying to renew lease...
301         leaseManager2.pausedInTryToRenewLeasePeriod();
302 
303         // ... let the test run for some time...
304         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
305 
306         // ... and check lease owner is has changed again to the first instance
307         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
308         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
309         assertFalse(leaseManager2.stillInLeasePeriod());
310         assertTrue(leaseManager1.stillInLeasePeriod());
311 
312         // Resume the second instance...
313         leaseManager2.resume();
314 
315         // ... let the test run for some time...
316         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
317 
318         // ... but the lease owner should still be the first instance
319         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
320         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
321         assertFalse(leaseManager2.stillInLeasePeriod());
322         assertTrue(leaseManager1.stillInLeasePeriod());
323 
324     }
325 
326 
327     @Test(timeOut = 80_000)
328     public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
329 
330         final String TEST_TSO_LEASE_PATH = "/test_wronginfo_tsolease";
331         final String TEST_CURRENT_TSO_PATH = "/test_wronginfo_currenttso";
332 
333         Panicker panicker = spy(new MockPanicker());
334 
335         // Launch the master instance...
336         TSOStateManager stateManager1 = mock(TSOStateManager.class);
337         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
338         PausableLeaseManager leaseManager = new PausableLeaseManager(LEASE_MGR_ID_1,
339                                                                      mock(TSOChannelHandler.class),
340                                                                      stateManager1,
341                                                                      TEST_LEASE_PERIOD_IN_MS,
342                                                                      TEST_TSO_LEASE_PATH,
343                                                                      TEST_CURRENT_TSO_PATH,
344                                                                      zkClient,
345                                                                      panicker);
346 
347         leaseManager.startService();
348         // ...and let the test run for some time...
349         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
350 
351         leaseManager.pausedInTryToRenewLeasePeriod();
352 
353         // 1st Panic test) Inject corrupted data in the ZNode, force reelection and test the panicker is exercised
354         zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "CorruptedData!!!".getBytes());
355 
356         // ...and let the test run for some time...
357         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
358         leaseManager.resume();
359         // ...and let the test run for some time...
360         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
361 
362         ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
363         verify(panicker, times(2)).panic(anyString(), trowableIAE.capture());
364         assertTrue(trowableIAE.getValue() instanceof IllegalArgumentException);
365         assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
366 
367         // 2nd Panic test) Simulate that a new master appeared in the meantime, force reelection
368         // and test the panicker is exercised
369         reset(panicker);
370         zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "newTSO:12345#10000".getBytes());
371 
372         leaseManager.pausedInTryToRenewLeasePeriod();
373 
374         // ...and let the test run for some time...
375         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
376         leaseManager.resume();
377         // ...and let the test run for some time...
378         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
379 
380         ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
381                 ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
382         verify(panicker, times(2)).panic(anyString(), trowableLME.capture());
383         assertTrue(trowableLME.getValue() instanceof LeaseManagement.LeaseManagementException);
384         assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
385     }
386 
387     @Test(timeOut = 1000)
388     public void testNonHALeaseManager() throws Exception {
389 
390         // Launch the instance...
391         VoidLeaseManager leaseManager = new VoidLeaseManager(mock(TSOChannelHandler.class),
392                                                              mock(TSOStateManager.class));
393 
394         leaseManager.startService();
395         assertTrue(leaseManager.stillInLeasePeriod());
396         leaseManager.stopService();
397 
398     }
399 
400     // ----------------------------------------------------------------------------------------------------------------
401     // Checkers
402     // ----------------------------------------------------------------------------------------------------------------
403 
404     private void checkLeaseHolder(String tsoLeasePath, String expectedLeaseHolder) throws Exception {
405         byte[] leaseHolderInBytes = zkClient.getData().forPath(tsoLeasePath);
406         String leaseHolder = new String(leaseHolderInBytes, Charsets.UTF_8);
407 
408         assertEquals(leaseHolder, expectedLeaseHolder);
409     }
410 
411     private void checkInstanceId(String currentTSOPath, String expectedInstanceId) throws Exception {
412         byte[] expectedInstanceIdInBytes = zkClient.getData().forPath(currentTSOPath);
413         String instanceId = new String(expectedInstanceIdInBytes, Charsets.UTF_8);
414 
415         assertEquals(instanceId, expectedInstanceId);
416     }
417 
418 }