View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.collect.Sets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import com.google.inject.Module;
24  import org.apache.omid.TestUtils;
25  import org.apache.omid.committable.CommitTable;
26  import org.apache.omid.proto.TSOProto;
27  import org.apache.omid.tso.PausableTimestampOracle;
28  import org.apache.omid.tso.TSOMockModule;
29  import org.apache.omid.tso.TSOServer;
30  import org.apache.omid.tso.TSOServerConfig;
31  import org.apache.omid.tso.TimestampOracle;
32  import org.apache.omid.tso.util.DummyCellIdImpl;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.testng.annotations.AfterMethod;
36  import org.testng.annotations.BeforeMethod;
37  import org.testng.annotations.Test;
38  
39  import java.util.ArrayList;
40  import java.util.List;
41  import java.util.Set;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.Future;
44  import java.util.concurrent.TimeUnit;
45  
46  import static org.testng.Assert.assertEquals;
47  import static org.testng.Assert.assertFalse;
48  import static org.testng.Assert.assertTrue;
49  import static org.testng.Assert.fail;
50  
51  public class TestTSOClientRequestAndResponseBehaviours {
52  
53      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
54  
55      private static final String TSO_SERVER_HOST = "localhost";
56      private static final int TSO_SERVER_PORT = 1234;
57  
58      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
59      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
60  
61      private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
62  
63      private OmidClientConfiguration tsoClientConf;
64  
65      // Required infrastructure for TSOClient test
66      private TSOServer tsoServer;
67      private PausableTimestampOracle pausableTSOracle;
68      private CommitTable commitTable;
69  
70      @BeforeMethod
71      public void beforeMethod() throws Exception {
72  
73          TSOServerConfig tsoConfig = new TSOServerConfig();
74          tsoConfig.setConflictMapSize(1000);
75          tsoConfig.setPort(TSO_SERVER_PORT);
76          tsoConfig.setNumConcurrentCTWriters(2);
77          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
78          Injector injector = Guice.createInjector(tsoServerMockModule);
79  
80          LOG.info("==================================================================================================");
81          LOG.info("======================================= Init TSO Server ==========================================");
82          LOG.info("==================================================================================================");
83  
84          tsoServer = injector.getInstance(TSOServer.class);
85          tsoServer.startAndWait();
86          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
87  
88          LOG.info("==================================================================================================");
89          LOG.info("===================================== TSO Server Initialized =====================================");
90          LOG.info("==================================================================================================");
91  
92          pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
93          commitTable = injector.getInstance(CommitTable.class);
94  
95          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
96          tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
97  
98          this.tsoClientConf = tsoClientConf;
99  
100     }
101 
102     @AfterMethod
103     public void afterMethod() throws Exception {
104 
105 
106         tsoServer.stopAndWait();
107         tsoServer = null;
108         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
109 
110         pausableTSOracle.resume();
111 
112     }
113 
114     /**
115      * Test to ensure TSOClient timeouts are cancelled.
116      * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
117      * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
118      * We send a lot of timestamp requests, and wait for them to complete.
119      * Ensure that the next request doesn't get hit by the timeouts of the previous
120      * requests. (i.e. make sure we cancel timeouts)
121      */
122     @Test(timeOut = 30_000)
123     public void testTimeoutsAreCancelled() throws Exception {
124 
125         TSOClient client = TSOClient.newInstance(tsoClientConf);
126         int requestTimeoutInMs = 500;
127         int requestMaxRetries = 5;
128         LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
129         Future<Long> f = null;
130         for (int i = 0; i < (requestMaxRetries * 10); i++) {
131             f = client.getNewStartTimestamp();
132         }
133         if (f != null) {
134             f.get();
135         }
136         pausableTSOracle.pause();
137         long msToSleep = ((long) (requestTimeoutInMs * 0.75));
138         LOG.info("Sleeping for {} ms", msToSleep);
139         TimeUnit.MILLISECONDS.sleep(msToSleep);
140         f = client.getNewStartTimestamp();
141         msToSleep = ((long) (requestTimeoutInMs * 0.9));
142         LOG.info("Sleeping for {} ms", msToSleep);
143         TimeUnit.MILLISECONDS.sleep(msToSleep);
144         LOG.info("Resuming");
145         pausableTSOracle.resume();
146         f.get();
147 
148     }
149 
150     @Test(timeOut = 30_000)
151     public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
152 
153         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
154         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
155         testTSOClientConf.setRequestMaxRetries(0);
156         TSOClient client = TSOClient.newInstance(testTSOClientConf);
157 
158         List<Long> startTimestamps = new ArrayList<>();
159         for (int i = 0; i < 10; i++) {
160             startTimestamps.add(client.getNewStartTimestamp().get());
161         }
162 
163         pausableTSOracle.pause();
164 
165         List<Future<Long>> futures = new ArrayList<>();
166         for (long s : startTimestamps) {
167             futures.add(client.commit(s, Sets.<CellId>newHashSet()));
168         }
169         TSOClientAccessor.closeChannel(client);
170 
171         for (Future<Long> f : futures) {
172             try {
173                 f.get();
174                 fail("Shouldn't be able to complete");
175             } catch (ExecutionException ee) {
176                 assertTrue(ee.getCause() instanceof ServiceUnavailableException,
177                            "Should be a service unavailable exception");
178             }
179         }
180     }
181 
182     /**
183      * Test that if a client tries to make a request without handshaking, it will be disconnected.
184      */
185     @Test(timeOut = 30_000)
186     public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
187 
188         TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
189 
190         TSOProto.Request request = TSOProto.Request.newBuilder()
191                 .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
192                 .build();
193         raw.write(request);
194         try {
195             raw.getResponse().get();
196             fail("Channel should be closed");
197         } catch (ExecutionException ee) {
198             assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
199         }
200         raw.close();
201 
202     }
203 
204     // ----------------------------------------------------------------------------------------------------------------
205     // Test duplicate commits
206     // ----------------------------------------------------------------------------------------------------------------
207 
208     /**
209      * This tests the case where messages arrive at the TSO out of order. This can happen in the case
210      * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
211      * aborted to the original message because the retry was already committed and it would be prohibitively
212      * expensive to check all non-retry requests to see if they are already committed. For this reason
213      * a client must ensure that if it is sending a retry due to a socket error, the previous channel
214      * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
215      * that this doesn't happen in non-socket error cases.
216      *
217      */
218     @Test(timeOut = 30_000)
219     public void testOutOfOrderMessages() throws Exception {
220 
221         TSOClient client = TSOClient.newInstance(tsoClientConf);
222         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
223 
224         long ts1 = client.getNewStartTimestamp().get();
225 
226         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
227         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
228         assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
229         assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
230     }
231 
232     @Test(timeOut = 30_000)
233     public void testDuplicateCommitAborting() throws Exception {
234 
235         TSOClient client = TSOClient.newInstance(tsoClientConf);
236         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
237 
238         long ts1 = client.getNewStartTimestamp().get();
239         long ts2 = client.getNewStartTimestamp().get();
240         client.commit(ts2, testWriteSet).get();
241 
242         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
243         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
244         assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
245         assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
246     }
247 
248     @Test(timeOut = 30_000)
249     public void testDuplicateCommit() throws Exception {
250 
251         TSOClient client = TSOClient.newInstance(tsoClientConf);
252         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
253 
254         long ts1 = client.getNewStartTimestamp().get();
255 
256         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
257         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
258         assertEquals(response2.getCommitResponse().getCommitTimestamp(),
259                      response1.getCommitResponse().getCommitTimestamp(),
260                      "Commit timestamp should be the same");
261     }
262 
263     // ----------------------------------------------------------------------------------------------------------------
264     // Test TSOClient retry behaviour
265     // ----------------------------------------------------------------------------------------------------------------
266 
267     @Test(timeOut = 30_000)
268     public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
269 
270         TSOClient client = TSOClient.newInstance(tsoClientConf);
271 
272         long ts1 = client.getNewStartTimestamp().get();
273         pausableTSOracle.pause();
274         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
275         TSOClientAccessor.closeChannel(client);
276         pausableTSOracle.resume();
277         future.get();
278 
279     }
280 
281     @Test(timeOut = 30_000)
282     public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
283 
284         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
285         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
286         testTSOClientConf.setRequestTimeoutInMs(100);
287         testTSOClientConf.setRequestMaxRetries(10000);
288         TSOClient client = TSOClient.newInstance(testTSOClientConf);
289 
290         long ts1 = client.getNewStartTimestamp().get();
291         pausableTSOracle.pause();
292         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
293         TimeUnit.SECONDS.sleep(1);
294         pausableTSOracle.resume();
295         future.get();
296     }
297 
298     @Test(timeOut = 30_000)
299     public void testCommitFailWhenTSOIsDown() throws Exception {
300 
301         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
302         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
303         testTSOClientConf.setRequestTimeoutInMs(100);
304         testTSOClientConf.setRequestMaxRetries(10);
305         TSOClient client = TSOClient.newInstance(testTSOClientConf);
306 
307         long ts1 = client.getNewStartTimestamp().get();
308         pausableTSOracle.pause();
309         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
310         try {
311             future.get();
312         } catch (ExecutionException e) {
313             assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
314                          "Should be a ServiceUnavailableExeption");
315         }
316 
317     }
318 
319     @Test(timeOut = 30_000)
320     public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
321 
322         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
323         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
324         testTSOClientConf.setRequestTimeoutInMs(100);
325         testTSOClientConf.setRequestMaxRetries(10000);
326         TSOClient client = TSOClient.newInstance(testTSOClientConf);
327 
328         pausableTSOracle.pause();
329         Future<Long> future = client.getNewStartTimestamp();
330         TimeUnit.SECONDS.sleep(1);
331         pausableTSOracle.resume();
332         future.get();
333 
334     }
335 
336     // ----------------------------------------------------------------------------------------------------------------
337     // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
338     // (They exercise the communication protocol) TODO Remove???
339     // ----------------------------------------------------------------------------------------------------------------
340     @Test(timeOut = 30_000)
341     public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
342 
343         TSOClient client = TSOClient.newInstance(tsoClientConf);
344         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
345 
346         long tx1ST = client.getNewStartTimestamp().get();
347 
348         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
349         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
350         assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
351         assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1);
352     }
353 
354     @Test(timeOut = 30_000)
355     public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
356 
357         TSOClient client = TSOClient.newInstance(tsoClientConf);
358         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
359 
360         long tx1ST = client.getNewStartTimestamp().get();
361         // Invalidate the transaction
362         commitTable.getClient().tryInvalidateTransaction(tx1ST);
363 
364         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
365         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
366         assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
367         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
368     }
369 
370     @Test(timeOut = 30_000)
371     public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
372 
373         TSOClient client = TSOClient.newInstance(tsoClientConf);
374         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
375 
376         long tx1ST = client.getNewStartTimestamp().get();
377 
378         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
379 
380         // Simulate remove entry from the commit table before exercise retry
381         commitTable.getClient().completeTransaction(tx1ST);
382 
383         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
384         assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
385         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
386     }
387     // ----------------------------------------------------------------------------------------------------------------
388     // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
389     // (They exercise the communication protocol) TODO Remove???
390     // ----------------------------------------------------------------------------------------------------------------
391 
392     // ----------------------------------------------------------------------------------------------------------------
393     // Helper methods
394     // ----------------------------------------------------------------------------------------------------------------
395 
396     private TSOProto.Request createRetryCommitRequest(long ts) {
397         return createCommitRequest(ts, true, testWriteSet);
398     }
399 
400     private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
401         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
402         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
403         commitBuilder.setStartTimestamp(ts);
404         commitBuilder.setIsRetry(retry);
405         for (CellId cell : writeSet) {
406             commitBuilder.addCellId(cell.getCellId());
407         }
408         return builder.setCommitRequest(commitBuilder.build()).build();
409     }
410 
411 }