View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.collect.Sets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import com.google.inject.Module;
24  import org.apache.omid.TestUtils;
25  import org.apache.omid.committable.CommitTable;
26  import org.apache.omid.proto.TSOProto;
27  import org.apache.omid.tso.PausableTimestampOracle;
28  import org.apache.omid.tso.TSOMockModule;
29  import org.apache.omid.tso.TSOServer;
30  import org.apache.omid.tso.TSOServerConfig;
31  import org.apache.omid.tso.TimestampOracle;
32  import org.apache.omid.tso.util.DummyCellIdImpl;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.testng.annotations.AfterClass;
36  import org.testng.annotations.AfterMethod;
37  import org.testng.annotations.BeforeClass;
38  import org.testng.annotations.BeforeMethod;
39  import org.testng.annotations.Test;
40  
41  import java.util.ArrayList;
42  import java.util.List;
43  import java.util.Set;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.TimeUnit;
47  
48  import static org.testng.Assert.assertEquals;
49  import static org.testng.Assert.assertFalse;
50  import static org.testng.Assert.assertTrue;
51  import static org.testng.Assert.fail;
52  
53  public class TestTSOClientRequestAndResponseBehaviours {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
56  
57      private static final String TSO_SERVER_HOST = "localhost";
58      private static final int TSO_SERVER_PORT = 1234;
59  
60      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
61      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
62  
63      private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
64  
65      private OmidClientConfiguration tsoClientConf;
66  
67      // Required infrastructure for TSOClient test
68      private TSOServer tsoServer;
69      private PausableTimestampOracle pausableTSOracle;
70      private CommitTable commitTable;
71  
72      @BeforeMethod
73      public void beforeMethod() throws Exception {
74  
75          TSOServerConfig tsoConfig = new TSOServerConfig();
76          tsoConfig.setMaxItems(1000);
77          tsoConfig.setPort(TSO_SERVER_PORT);
78          tsoConfig.setNumConcurrentCTWriters(2);
79          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
80          Injector injector = Guice.createInjector(tsoServerMockModule);
81  
82          LOG.info("==================================================================================================");
83          LOG.info("======================================= Init TSO Server ==========================================");
84          LOG.info("==================================================================================================");
85  
86          tsoServer = injector.getInstance(TSOServer.class);
87          tsoServer.startAndWait();
88          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
89  
90          LOG.info("==================================================================================================");
91          LOG.info("===================================== TSO Server Initialized =====================================");
92          LOG.info("==================================================================================================");
93  
94          pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
95          commitTable = injector.getInstance(CommitTable.class);
96  
97          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
98          tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
99  
100         this.tsoClientConf = tsoClientConf;
101 
102     }
103 
104     @AfterMethod
105     public void afterMethod() throws Exception {
106 
107 
108         tsoServer.stopAndWait();
109         tsoServer = null;
110         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
111 
112         pausableTSOracle.resume();
113 
114     }
115 
116     /**
117      * Test to ensure TSOClient timeouts are cancelled.
118      * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
119      * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
120      * We send a lot of timestamp requests, and wait for them to complete.
121      * Ensure that the next request doesn't get hit by the timeouts of the previous
122      * requests. (i.e. make sure we cancel timeouts)
123      */
124     @Test(timeOut = 30_000)
125     public void testTimeoutsAreCancelled() throws Exception {
126 
127         TSOClient client = TSOClient.newInstance(tsoClientConf);
128         int requestTimeoutInMs = 500;
129         int requestMaxRetries = 5;
130         LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
131         Future<Long> f = null;
132         for (int i = 0; i < (requestMaxRetries * 10); i++) {
133             f = client.getNewStartTimestamp();
134         }
135         if (f != null) {
136             f.get();
137         }
138         pausableTSOracle.pause();
139         long msToSleep = ((long) (requestTimeoutInMs * 0.75));
140         LOG.info("Sleeping for {} ms", msToSleep);
141         TimeUnit.MILLISECONDS.sleep(msToSleep);
142         f = client.getNewStartTimestamp();
143         msToSleep = ((long) (requestTimeoutInMs * 0.9));
144         LOG.info("Sleeping for {} ms", msToSleep);
145         TimeUnit.MILLISECONDS.sleep(msToSleep);
146         LOG.info("Resuming");
147         pausableTSOracle.resume();
148         f.get();
149 
150     }
151 
152     @Test(timeOut = 30_000)
153     public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
154 
155         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
156         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
157         testTSOClientConf.setRequestMaxRetries(0);
158         TSOClient client = TSOClient.newInstance(testTSOClientConf);
159 
160         List<Long> startTimestamps = new ArrayList<>();
161         for (int i = 0; i < 10; i++) {
162             startTimestamps.add(client.getNewStartTimestamp().get());
163         }
164 
165         pausableTSOracle.pause();
166 
167         List<Future<Long>> futures = new ArrayList<>();
168         for (long s : startTimestamps) {
169             futures.add(client.commit(s, Sets.<CellId>newHashSet()));
170         }
171         TSOClientAccessor.closeChannel(client);
172 
173         for (Future<Long> f : futures) {
174             try {
175                 f.get();
176                 fail("Shouldn't be able to complete");
177             } catch (ExecutionException ee) {
178                 assertTrue(ee.getCause() instanceof ServiceUnavailableException,
179                            "Should be a service unavailable exception");
180             }
181         }
182     }
183 
184     /**
185      * Test that if a client tries to make a request without handshaking, it will be disconnected.
186      */
187     @Test(timeOut = 30_000)
188     public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
189 
190         TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
191 
192         TSOProto.Request request = TSOProto.Request.newBuilder()
193                 .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
194                 .build();
195         raw.write(request);
196         try {
197             raw.getResponse().get();
198             fail("Channel should be closed");
199         } catch (ExecutionException ee) {
200             assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
201         }
202         raw.close();
203 
204     }
205 
206     // ----------------------------------------------------------------------------------------------------------------
207     // Test duplicate commits
208     // ----------------------------------------------------------------------------------------------------------------
209 
210     /**
211      * This tests the case where messages arrive at the TSO out of order. This can happen in the case
212      * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
213      * aborted to the original message because the retry was already committed and it would be prohibitively
214      * expensive to check all non-retry requests to see if they are already committed. For this reason
215      * a client must ensure that if it is sending a retry due to a socket error, the previous channel
216      * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
217      * that this doesn't happen in non-socket error cases.
218      *
219      */
220     @Test(timeOut = 30_000)
221     public void testOutOfOrderMessages() throws Exception {
222 
223         TSOClient client = TSOClient.newInstance(tsoClientConf);
224         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
225 
226         long ts1 = client.getNewStartTimestamp().get();
227 
228         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
229         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
230         assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
231         assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
232     }
233 
234     @Test(timeOut = 30_000)
235     public void testDuplicateCommitAborting() throws Exception {
236 
237         TSOClient client = TSOClient.newInstance(tsoClientConf);
238         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
239 
240         long ts1 = client.getNewStartTimestamp().get();
241         long ts2 = client.getNewStartTimestamp().get();
242         client.commit(ts2, testWriteSet).get();
243 
244         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
245         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
246         assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
247         assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
248     }
249 
250     @Test(timeOut = 30_000)
251     public void testDuplicateCommit() throws Exception {
252 
253         TSOClient client = TSOClient.newInstance(tsoClientConf);
254         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
255 
256         long ts1 = client.getNewStartTimestamp().get();
257 
258         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
259         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
260         assertEquals(response2.getCommitResponse().getCommitTimestamp(),
261                      response1.getCommitResponse().getCommitTimestamp(),
262                      "Commit timestamp should be the same");
263     }
264 
265     // ----------------------------------------------------------------------------------------------------------------
266     // Test TSOClient retry behaviour
267     // ----------------------------------------------------------------------------------------------------------------
268 
269     @Test(timeOut = 30_000)
270     public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
271 
272         TSOClient client = TSOClient.newInstance(tsoClientConf);
273 
274         long ts1 = client.getNewStartTimestamp().get();
275         pausableTSOracle.pause();
276         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
277         TSOClientAccessor.closeChannel(client);
278         pausableTSOracle.resume();
279         future.get();
280 
281     }
282 
283     @Test(timeOut = 30_000)
284     public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
285 
286         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
287         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
288         testTSOClientConf.setRequestTimeoutInMs(100);
289         testTSOClientConf.setRequestMaxRetries(10000);
290         TSOClient client = TSOClient.newInstance(testTSOClientConf);
291 
292         long ts1 = client.getNewStartTimestamp().get();
293         pausableTSOracle.pause();
294         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
295         TimeUnit.SECONDS.sleep(1);
296         pausableTSOracle.resume();
297         future.get();
298     }
299 
300     @Test(timeOut = 30_000)
301     public void testCommitFailWhenTSOIsDown() throws Exception {
302 
303         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
304         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
305         testTSOClientConf.setRequestTimeoutInMs(100);
306         testTSOClientConf.setRequestMaxRetries(10);
307         TSOClient client = TSOClient.newInstance(testTSOClientConf);
308 
309         long ts1 = client.getNewStartTimestamp().get();
310         pausableTSOracle.pause();
311         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
312         try {
313             future.get();
314         } catch (ExecutionException e) {
315             assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
316                          "Should be a ServiceUnavailableExeption");
317         }
318 
319     }
320 
321     @Test(timeOut = 30_000)
322     public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
323 
324         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
325         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
326         testTSOClientConf.setRequestTimeoutInMs(100);
327         testTSOClientConf.setRequestMaxRetries(10000);
328         TSOClient client = TSOClient.newInstance(testTSOClientConf);
329 
330         pausableTSOracle.pause();
331         Future<Long> future = client.getNewStartTimestamp();
332         TimeUnit.SECONDS.sleep(1);
333         pausableTSOracle.resume();
334         future.get();
335 
336     }
337 
338     // ----------------------------------------------------------------------------------------------------------------
339     // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
340     // (They exercise the communication protocol) TODO Remove???
341     // ----------------------------------------------------------------------------------------------------------------
342     @Test
343     public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
344 
345         TSOClient client = TSOClient.newInstance(tsoClientConf);
346         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
347 
348         long tx1ST = client.getNewStartTimestamp().get();
349 
350         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
351         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
352         assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
353         assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1);
354     }
355 
356     @Test
357     public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
358 
359         TSOClient client = TSOClient.newInstance(tsoClientConf);
360         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
361 
362         long tx1ST = client.getNewStartTimestamp().get();
363         // Invalidate the transaction
364         commitTable.getClient().tryInvalidateTransaction(tx1ST);
365 
366         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
367         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
368         assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
369         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
370     }
371 
372     @Test
373     public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
374 
375         TSOClient client = TSOClient.newInstance(tsoClientConf);
376         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
377 
378         long tx1ST = client.getNewStartTimestamp().get();
379 
380         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
381 
382         // Simulate remove entry from the commit table before exercise retry
383         commitTable.getClient().completeTransaction(tx1ST);
384 
385         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
386         assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
387         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
388     }
389     // ----------------------------------------------------------------------------------------------------------------
390     // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
391     // (They exercise the communication protocol) TODO Remove???
392     // ----------------------------------------------------------------------------------------------------------------
393 
394     // ----------------------------------------------------------------------------------------------------------------
395     // Helper methods
396     // ----------------------------------------------------------------------------------------------------------------
397 
398     private TSOProto.Request createRetryCommitRequest(long ts) {
399         return createCommitRequest(ts, true, testWriteSet);
400     }
401 
402     private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
403         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
404         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
405         commitBuilder.setStartTimestamp(ts);
406         commitBuilder.setIsRetry(retry);
407         for (CellId cell : writeSet) {
408             commitBuilder.addCellId(cell.getCellId());
409         }
410         return builder.setCommitRequest(commitBuilder.build()).build();
411     }
412 
413 }