View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import com.google.inject.Module;
24  
25  import org.apache.omid.TestUtils;
26  import org.apache.omid.committable.CommitTable;
27  import org.apache.omid.proto.TSOProto;
28  import org.apache.omid.tso.PausableTimestampOracle;
29  import org.apache.omid.tso.TSOMockModule;
30  import org.apache.omid.tso.TSOServer;
31  import org.apache.omid.tso.TSOServerConfig;
32  import org.apache.omid.tso.TimestampOracle;
33  import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
34  import org.apache.omid.tso.util.DummyCellIdImpl;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  import org.testng.annotations.AfterMethod;
38  import org.testng.annotations.BeforeMethod;
39  import org.testng.annotations.Test;
40  
41  import java.util.ArrayList;
42  import java.util.List;
43  import java.util.Set;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.TimeUnit;
47  
48  import static org.testng.Assert.assertEquals;
49  import static org.testng.Assert.assertFalse;
50  import static org.testng.Assert.assertTrue;
51  import static org.testng.Assert.fail;
52  
53  public class TestTSOClientRequestAndResponseBehaviours {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
56  
57      private static final String TSO_SERVER_HOST = "localhost";
58      private static final int TSO_SERVER_PORT = 1234;
59  
60      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
61      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
62  
63      private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
64  
65      private OmidClientConfiguration tsoClientConf;
66  
67      // Required infrastructure for TSOClient test
68      private TSOServer tsoServer;
69      private PausableTimestampOracle pausableTSOracle;
70      private CommitTable commitTable;
71  
72      @BeforeMethod
73      public void beforeMethod() throws Exception {
74  
75          TSOServerConfig tsoConfig = new TSOServerConfig();
76          tsoConfig.setConflictMapSize(1000);
77          tsoConfig.setPort(TSO_SERVER_PORT);
78          tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
79          tsoConfig.setNumConcurrentCTWriters(2);
80          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
81          Injector injector = Guice.createInjector(tsoServerMockModule);
82  
83          LOG.info("==================================================================================================");
84          LOG.info("======================================= Init TSO Server ==========================================");
85          LOG.info("==================================================================================================");
86  
87          tsoServer = injector.getInstance(TSOServer.class);
88          tsoServer.startAsync();
89          tsoServer.awaitRunning();
90          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
91  
92          LOG.info("==================================================================================================");
93          LOG.info("===================================== TSO Server Initialized =====================================");
94          LOG.info("==================================================================================================");
95  
96          pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
97          commitTable = injector.getInstance(CommitTable.class);
98  
99          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
100         tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
101 
102         this.tsoClientConf = tsoClientConf;
103 
104     }
105 
106     @AfterMethod
107     public void afterMethod() throws Exception {
108 
109 
110         tsoServer.stopAsync();
111         tsoServer.awaitTerminated();
112         tsoServer = null;
113         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
114 
115         pausableTSOracle.resume();
116 
117     }
118 
119     /**
120      * Test to ensure TSOClient timeouts are cancelled.
121      * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
122      * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
123      * We send a lot of timestamp requests, and wait for them to complete.
124      * Ensure that the next request doesn't get hit by the timeouts of the previous
125      * requests. (i.e. make sure we cancel timeouts)
126      */
127     @Test(timeOut = 30_000)
128     public void testTimeoutsAreCancelled() throws Exception {
129 
130         TSOClient client = TSOClient.newInstance(tsoClientConf);
131         int requestTimeoutInMs = 500;
132         int requestMaxRetries = 5;
133         LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
134         Future<Long> f = null;
135         for (int i = 0; i < (requestMaxRetries * 10); i++) {
136             f = client.getNewStartTimestamp();
137         }
138         if (f != null) {
139             f.get();
140         }
141         pausableTSOracle.pause();
142         long msToSleep = ((long) (requestTimeoutInMs * 0.75));
143         LOG.info("Sleeping for {} ms", msToSleep);
144         TimeUnit.MILLISECONDS.sleep(msToSleep);
145         f = client.getNewStartTimestamp();
146         msToSleep = ((long) (requestTimeoutInMs * 0.9));
147         LOG.info("Sleeping for {} ms", msToSleep);
148         TimeUnit.MILLISECONDS.sleep(msToSleep);
149         LOG.info("Resuming");
150         pausableTSOracle.resume();
151         f.get();
152 
153     }
154 
155     @Test(timeOut = 30_000)
156     public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
157 
158         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
159         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
160         testTSOClientConf.setRequestMaxRetries(0);
161         TSOClient client = TSOClient.newInstance(testTSOClientConf);
162 
163         List<Long> startTimestamps = new ArrayList<>();
164         for (int i = 0; i < 10; i++) {
165             startTimestamps.add(client.getNewStartTimestamp().get());
166         }
167 
168         pausableTSOracle.pause();
169 
170         List<Future<Long>> futures = new ArrayList<>();
171         for (long s : startTimestamps) {
172             futures.add(client.commit(s, Sets.<CellId>newHashSet()));
173         }
174         TSOClientAccessor.closeChannel(client);
175 
176         for (Future<Long> f : futures) {
177             try {
178                 f.get();
179                 fail("Shouldn't be able to complete");
180             } catch (ExecutionException ee) {
181                 assertTrue(ee.getCause() instanceof ServiceUnavailableException,
182                            "Should be a service unavailable exception");
183             }
184         }
185     }
186 
187     /**
188      * Test that if a client tries to make a request without handshaking, it will be disconnected.
189      */
190     @Test(timeOut = 30_000)
191     public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
192 
193         TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
194 
195         TSOProto.Request request = TSOProto.Request.newBuilder()
196                 .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
197                 .build();
198         raw.write(request);
199         try {
200             raw.getResponse().get();
201             fail("Channel should be closed");
202         } catch (ExecutionException ee) {
203             assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
204         }
205         raw.close();
206 
207     }
208 
209     // ----------------------------------------------------------------------------------------------------------------
210     // Test duplicate commits
211     // ----------------------------------------------------------------------------------------------------------------
212 
213     /**
214      * This tests the case where messages arrive at the TSO out of order. This can happen in the case
215      * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
216      * aborted to the original message because the retry was already committed and it would be prohibitively
217      * expensive to check all non-retry requests to see if they are already committed. For this reason
218      * a client must ensure that if it is sending a retry due to a socket error, the previous channel
219      * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
220      * that this doesn't happen in non-socket error cases.
221      *
222      */
223     @Test(timeOut = 30_000)
224     public void testOutOfOrderMessages() throws Exception {
225 
226         TSOClient client = TSOClient.newInstance(tsoClientConf);
227         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
228 
229         long ts1 = client.getNewStartTimestamp().get();
230 
231         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
232         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
233         assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
234         assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
235     }
236 
237     @Test(timeOut = 30_000)
238     public void testDuplicateCommitAborting() throws Exception {
239 
240         TSOClient client = TSOClient.newInstance(tsoClientConf);
241         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
242 
243         long ts1 = client.getNewStartTimestamp().get();
244         long ts2 = client.getNewStartTimestamp().get();
245         client.commit(ts2, testWriteSet).get();
246 
247         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
248         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
249         assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
250         assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
251     }
252 
253     @Test(timeOut = 30_000)
254     public void testDuplicateCommit() throws Exception {
255 
256         TSOClient client = TSOClient.newInstance(tsoClientConf);
257         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
258 
259         long ts1 = client.getNewStartTimestamp().get();
260 
261         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
262         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
263         if (client.isLowLatency()) {
264             assertTrue(response1.hasCommitResponse());
265             assertTrue(response2.getCommitResponse().getAborted());
266         } else
267             assertEquals(response2.getCommitResponse().getCommitTimestamp(),
268                     response1.getCommitResponse().getCommitTimestamp(),
269                     "Commit timestamp should be the same");
270     }
271 
272     // ----------------------------------------------------------------------------------------------------------------
273     // Test TSOClient retry behaviour
274     // ----------------------------------------------------------------------------------------------------------------
275 
276     @Test(timeOut = 30_000)
277     public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
278 
279         TSOClient client = TSOClient.newInstance(tsoClientConf);
280         long ts1 = client.getNewStartTimestamp().get();
281         if(client.isLowLatency())
282             return;
283         pausableTSOracle.pause();
284         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
285         TSOClientAccessor.closeChannel(client);
286         pausableTSOracle.resume();
287         future.get();
288 
289     }
290 
291     @Test(timeOut = 30_000)
292     public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
293 
294         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
295         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
296         testTSOClientConf.setRequestTimeoutInMs(100);
297         testTSOClientConf.setRequestMaxRetries(10000);
298         TSOClient client = TSOClient.newInstance(testTSOClientConf);
299 
300         long ts1 = client.getNewStartTimestamp().get();
301         pausableTSOracle.pause();
302         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
303         TimeUnit.SECONDS.sleep(1);
304         pausableTSOracle.resume();
305         future.get();
306     }
307 
308     @Test(timeOut = 30_000)
309     public void testCommitFailWhenTSOIsDown() throws Exception {
310 
311         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
312         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
313         testTSOClientConf.setRequestTimeoutInMs(100);
314         testTSOClientConf.setRequestMaxRetries(10);
315         TSOClient client = TSOClient.newInstance(testTSOClientConf);
316 
317         long ts1 = client.getNewStartTimestamp().get();
318         pausableTSOracle.pause();
319         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
320         try {
321             future.get();
322         } catch (ExecutionException e) {
323             assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
324                          "Should be a ServiceUnavailableExeption");
325         }
326 
327     }
328 
329     @Test(timeOut = 30_000)
330     public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
331 
332         OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
333         testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
334         testTSOClientConf.setRequestTimeoutInMs(100);
335         testTSOClientConf.setRequestMaxRetries(10000);
336         TSOClient client = TSOClient.newInstance(testTSOClientConf);
337 
338         pausableTSOracle.pause();
339         Future<Long> future = client.getNewStartTimestamp();
340         TimeUnit.SECONDS.sleep(1);
341         pausableTSOracle.resume();
342         future.get();
343 
344     }
345 
346     // ----------------------------------------------------------------------------------------------------------------
347     // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
348     // (They exercise the communication protocol) TODO Remove???
349     // ----------------------------------------------------------------------------------------------------------------
350     @Test(timeOut = 30_000)
351     public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
352 
353         TSOClient client = TSOClient.newInstance(tsoClientConf);
354         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
355 
356         long tx1ST = client.getNewStartTimestamp().get();
357 
358         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
359         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
360         if (client.isLowLatency())
361             assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
362         else {
363             assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
364             assertEquals(response.getCommitResponse().getCommitTimestamp(),
365                     tx1ST + CommitTable.MAX_CHECKPOINTS_PER_TXN);
366         }
367     }
368 
369     @Test(timeOut = 30_000)
370     public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
371 
372         TSOClient client = TSOClient.newInstance(tsoClientConf);
373         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
374 
375         long tx1ST = client.getNewStartTimestamp().get();
376         // Invalidate the transaction
377         commitTable.getClient().tryInvalidateTransaction(tx1ST);
378 
379         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
380         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
381         assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
382         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
383     }
384 
385     @Test(timeOut = 30_000)
386     public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
387 
388         TSOClient client = TSOClient.newInstance(tsoClientConf);
389         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
390 
391         long tx1ST = client.getNewStartTimestamp().get();
392 
393         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
394 
395         // Simulate remove entry from the commit table before exercise retry
396         commitTable.getClient().deleteCommitEntry(tx1ST);
397 
398         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
399         assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
400         assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
401     }
402     // ----------------------------------------------------------------------------------------------------------------
403     // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
404     // (They exercise the communication protocol) TODO Remove???
405     // ----------------------------------------------------------------------------------------------------------------
406 
407     // ----------------------------------------------------------------------------------------------------------------
408     // Helper methods
409     // ----------------------------------------------------------------------------------------------------------------
410 
411     private TSOProto.Request createRetryCommitRequest(long ts) {
412         return createCommitRequest(ts, true, testWriteSet);
413     }
414 
415     private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
416         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
417         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
418         commitBuilder.setStartTimestamp(ts);
419         commitBuilder.setIsRetry(retry);
420         for (CellId cell : writeSet) {
421             commitBuilder.addCellId(cell.getCellId());
422         }
423         return builder.setCommitRequest(commitBuilder.build()).build();
424     }
425 
426 }