View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.collect.Sets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import com.google.inject.Module;
24  
25  import org.apache.omid.TestUtils;
26  import org.apache.omid.committable.CommitTable;
27  import org.apache.omid.proto.TSOProto;
28  import org.apache.omid.transaction.AbstractTransactionManager;
29  import org.apache.omid.tso.PausableTimestampOracle;
30  import org.apache.omid.tso.TSOMockModule;
31  import org.apache.omid.tso.TSOServer;
32  import org.apache.omid.tso.TSOServerConfig;
33  import org.apache.omid.tso.TimestampOracle;
34  import org.apache.omid.tso.util.DummyCellIdImpl;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  import org.testng.annotations.AfterMethod;
38  import org.testng.annotations.BeforeMethod;
39  import org.testng.annotations.Test;
40  
41  import java.util.ArrayList;
42  import java.util.List;
43  import java.util.Set;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.TimeUnit;
47  
48  import static org.testng.Assert.assertEquals;
49  import static org.testng.Assert.assertFalse;
50  import static org.testng.Assert.assertTrue;
51  import static org.testng.Assert.fail;
52  
53  public class TestTSOClientRequestAndResponseBehaviours {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
56  
57      private static final String TSO_SERVER_HOST = "localhost";
58      private static final int TSO_SERVER_PORT = 1234;
59  
60      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
61      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
62  
63      private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
64  
65      private OmidClientConfiguration tsoClientConf;
66  
67      // Required infrastructure for TSOClient test
68      private TSOServer tsoServer;
69      private PausableTimestampOracle pausableTSOracle;
70      private CommitTable commitTable;
71  
72      @BeforeMethod
73      public void beforeMethod() throws Exception {
74  
75          TSOServerConfig tsoConfig = new TSOServerConfig();
76          tsoConfig.setConflictMapSize(1000);
77          tsoConfig.setPort(TSO_SERVER_PORT);
78          tsoConfig.setNumConcurrentCTWriters(2);
79          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
80          Injector injector = Guice.createInjector(tsoServerMockModule);
81  
82          LOG.info("==================================================================================================");
83          LOG.info("======================================= Init TSO Server ==========================================");
84          LOG.info("==================================================================================================");
85  
86          tsoServer = injector.getInstance(TSOServer.class);
87          tsoServer.startAndWait();
88          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
89  
90          LOG.info("==================================================================================================");
91          LOG.info("===================================== TSO Server Initialized =====================================");
92          LOG.info("==================================================================================================");
93  
94          pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
95          commitTable = injector.getInstance(CommitTable.class);
96  
97          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
98          tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
99  
100         this.tsoClientConf = tsoClientConf;
101 
102     }
103 
104     @AfterMethod
105     public void afterMethod() throws Exception {
106 
107 
108         tsoServer.stopAndWait();
109         tsoServer = null;
110         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
111 
112         pausableTSOracle.resume();
113 
114     }
115 
116     /**
117      * Test to ensure TSOClient timeouts are cancelled.
118      * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
119      * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
120      * We send a lot of timestamp requests, and wait for them to complete.
121      * Ensure that the next request doesn't get hit by the timeouts of the previous
122      * requests. (i.e. make sure we cancel timeouts)
123      */
124     @Test(timeOut = 30_000)
125     public void testTimeoutsAreCancelled() throws Exception {
126 
127         TSOClient client = TSOClient.newInstance(tsoClientConf);
128         int requestTimeoutInMs = 500;
129         int requestMaxRetries = 5;
130         LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
131         Future<Long> f = null;
132         for (int i = 0; i < (requestMaxRetries * 10); i++) {
133             f = client.getNewStartTimestamp();
134         }
135         if (f != null) {
136             f.get();
137         }
138         pausableTSOracle.pause();
139         long msToSleep = ((long) (requestTimeoutInMs * 0.75));
140         LOG.info("Sleeping for {} ms", msToSleep);
141         TimeUnit.MILLISECONDS.sleep(msToSleep);
142         f = client.getNewStartTimestamp();
143         msToSleep = ((long) (requestTimeoutInMs * 0.9));
144         LOG.info("Sleeping for {} ms", msToSleep);
145         TimeUnit.MILLISECONDS.sleep(msToSleep);
146         LOG.info("Resuming");
147         pausableTSOracle.resume();
148         f.get();
149 
150     }
151 
152     @Test(timeOut = 30_000)
153     public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
154 
155         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
156         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
157         testTSOClientConf.setRequestMaxRetries(0);
158         TSOClient client = TSOClient.newInstance(testTSOClientConf);
159 
160         List<Long> startTimestamps = new ArrayList<>();
161         for (int i = 0; i < 10; i++) {
162             startTimestamps.add(client.getNewStartTimestamp().get());
163         }
164 
165         pausableTSOracle.pause();
166 
167         List<Future<Long>> futures = new ArrayList<>();
168         for (long s : startTimestamps) {
169             futures.add(client.commit(s, Sets.<CellId>newHashSet()));
170         }
171         TSOClientAccessor.closeChannel(client);
172 
173         for (Future<Long> f : futures) {
174             try {
175                 f.get();
176                 fail("Shouldn't be able to complete");
177             } catch (ExecutionException ee) {
178                 assertTrue(ee.getCause() instanceof ServiceUnavailableException,
179                            "Should be a service unavailable exception");
180             }
181         }
182     }
183 
184     /**
185      * Test that if a client tries to make a request without handshaking, it will be disconnected.
186      */
187     @Test(timeOut = 30_000)
188     public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
189 
190         TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
191 
192         TSOProto.Request request = TSOProto.Request.newBuilder()
193                 .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
194                 .build();
195         raw.write(request);
196         try {
197             raw.getResponse().get();
198             fail("Channel should be closed");
199         } catch (ExecutionException ee) {
200             assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
201         }
202         raw.close();
203 
204     }
205 
206     // ----------------------------------------------------------------------------------------------------------------
207     // Test duplicate commits
208     // ----------------------------------------------------------------------------------------------------------------
209 
210     /**
211      * This tests the case where messages arrive at the TSO out of order. This can happen in the case
212      * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
213      * aborted to the original message because the retry was already committed and it would be prohibitively
214      * expensive to check all non-retry requests to see if they are already committed. For this reason
215      * a client must ensure that if it is sending a retry due to a socket error, the previous channel
216      * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
217      * that this doesn't happen in non-socket error cases.
218      *
219      */
220     @Test(timeOut = 30_000)
221     public void testOutOfOrderMessages() throws Exception {
222 
223         TSOClient client = TSOClient.newInstance(tsoClientConf);
224         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
225 
226         long ts1 = client.getNewStartTimestamp().get();
227 
228         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
229         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
230         assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
231         assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
232     }
233 
234     @Test(timeOut = 30_000)
235     public void testDuplicateCommitAborting() throws Exception {
236 
237         TSOClient client = TSOClient.newInstance(tsoClientConf);
238         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
239 
240         long ts1 = client.getNewStartTimestamp().get();
241         long ts2 = client.getNewStartTimestamp().get();
242         client.commit(ts2, testWriteSet).get();
243 
244         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
245         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
246         assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
247         assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
248     }
249 
250     @Test(timeOut = 30_000)
251     public void testDuplicateCommit() throws Exception {
252 
253         TSOClient client = TSOClient.newInstance(tsoClientConf);
254         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
255 
256         long ts1 = client.getNewStartTimestamp().get();
257 
258         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
259         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
260         if (client.isLowLatency()) {
261             assertTrue(response1.hasCommitResponse());
262             assertTrue(response2.getCommitResponse().getAborted());
263         } else
264             assertEquals(response2.getCommitResponse().getCommitTimestamp(),
265                     response1.getCommitResponse().getCommitTimestamp(),
266                     "Commit timestamp should be the same");
267     }
268 
269     // ----------------------------------------------------------------------------------------------------------------
270     // Test TSOClient retry behaviour
271     // ----------------------------------------------------------------------------------------------------------------
272 
273     @Test(timeOut = 30_000)
274     public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
275 
276         TSOClient client = TSOClient.newInstance(tsoClientConf);
277         long ts1 = client.getNewStartTimestamp().get();
278         if(client.isLowLatency())
279             return;
280         pausableTSOracle.pause();
281         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
282         TSOClientAccessor.closeChannel(client);
283         pausableTSOracle.resume();
284         future.get();
285 
286     }
287 
288     @Test(timeOut = 30_000)
289     public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
290 
291         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
292         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
293         testTSOClientConf.setRequestTimeoutInMs(100);
294         testTSOClientConf.setRequestMaxRetries(10000);
295         TSOClient client = TSOClient.newInstance(testTSOClientConf);
296 
297         long ts1 = client.getNewStartTimestamp().get();
298         pausableTSOracle.pause();
299         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
300         TimeUnit.SECONDS.sleep(1);
301         pausableTSOracle.resume();
302         future.get();
303     }
304 
305     @Test(timeOut = 30_000)
306     public void testCommitFailWhenTSOIsDown() throws Exception {
307 
308         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
309         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
310         testTSOClientConf.setRequestTimeoutInMs(100);
311         testTSOClientConf.setRequestMaxRetries(10);
312         TSOClient client = TSOClient.newInstance(testTSOClientConf);
313 
314         long ts1 = client.getNewStartTimestamp().get();
315         pausableTSOracle.pause();
316         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
317         try {
318             future.get();
319         } catch (ExecutionException e) {
320             assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
321                          "Should be a ServiceUnavailableExeption");
322         }
323 
324     }
325 
326     @Test(timeOut = 30_000)
327     public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
328 
329         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
330         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
331         testTSOClientConf.setRequestTimeoutInMs(100);
332         testTSOClientConf.setRequestMaxRetries(10000);
333         TSOClient client = TSOClient.newInstance(testTSOClientConf);
334 
335         pausableTSOracle.pause();
336         Future<Long> future = client.getNewStartTimestamp();
337         TimeUnit.SECONDS.sleep(1);
338         pausableTSOracle.resume();
339         future.get();
340 
341     }
342 
343     // ----------------------------------------------------------------------------------------------------------------
344     // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
345     // (They exercise the communication protocol) TODO Remove???
346     // ----------------------------------------------------------------------------------------------------------------
347     @Test(timeOut = 30_000)
348     public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
349 
350         TSOClient client = TSOClient.newInstance(tsoClientConf);
351         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
352 
353         long tx1ST = client.getNewStartTimestamp().get();
354 
355         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
356         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
357         if (client.isLowLatency())
358             assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
359         else {
360             assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
361             assertEquals(response.getCommitResponse().getCommitTimestamp(),
362                     tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
363         }
364     }
365 
366     @Test(timeOut = 30_000)
367     public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
368 
369         TSOClient client = TSOClient.newInstance(tsoClientConf);
370         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
371 
372         long tx1ST = client.getNewStartTimestamp().get();
373         // Invalidate the transaction
374         commitTable.getClient().tryInvalidateTransaction(tx1ST);
375 
376         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
377         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
378         assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
379         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
380     }
381 
382     @Test(timeOut = 30_000)
383     public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
384 
385         TSOClient client = TSOClient.newInstance(tsoClientConf);
386         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
387 
388         long tx1ST = client.getNewStartTimestamp().get();
389 
390         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
391 
392         // Simulate remove entry from the commit table before exercise retry
393         commitTable.getClient().completeTransaction(tx1ST);
394 
395         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
396         assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
397         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
398     }
399     // ----------------------------------------------------------------------------------------------------------------
400     // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
401     // (They exercise the communication protocol) TODO Remove???
402     // ----------------------------------------------------------------------------------------------------------------
403 
404     // ----------------------------------------------------------------------------------------------------------------
405     // Helper methods
406     // ----------------------------------------------------------------------------------------------------------------
407 
408     private TSOProto.Request createRetryCommitRequest(long ts) {
409         return createCommitRequest(ts, true, testWriteSet);
410     }
411 
412     private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
413         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
414         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
415         commitBuilder.setStartTimestamp(ts);
416         commitBuilder.setIsRetry(retry);
417         for (CellId cell : writeSet) {
418             commitBuilder.addCellId(cell.getCellId());
419         }
420         return builder.setCommitRequest(commitBuilder.build()).build();
421     }
422 
423 }