1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
21 import com.google.inject.Guice;
22 import com.google.inject.Injector;
23 import com.google.inject.Module;
24
25 import org.apache.omid.TestUtils;
26 import org.apache.omid.committable.CommitTable;
27 import org.apache.omid.proto.TSOProto;
28 import org.apache.omid.tso.PausableTimestampOracle;
29 import org.apache.omid.tso.TSOMockModule;
30 import org.apache.omid.tso.TSOServer;
31 import org.apache.omid.tso.TSOServerConfig;
32 import org.apache.omid.tso.TimestampOracle;
33 import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
34 import org.apache.omid.tso.util.DummyCellIdImpl;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.testng.annotations.AfterMethod;
38 import org.testng.annotations.BeforeMethod;
39 import org.testng.annotations.Test;
40
41 import java.util.ArrayList;
42 import java.util.List;
43 import java.util.Set;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.TimeUnit;
47
48 import static org.testng.Assert.assertEquals;
49 import static org.testng.Assert.assertFalse;
50 import static org.testng.Assert.assertTrue;
51 import static org.testng.Assert.fail;
52
53 public class TestTSOClientRequestAndResponseBehaviours {
54
55 private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
56
57 private static final String TSO_SERVER_HOST = "localhost";
58 private static final int TSO_SERVER_PORT = 1234;
59
60 private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
61 private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
62
63 private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
64
65 private OmidClientConfiguration tsoClientConf;
66
67
68 private TSOServer tsoServer;
69 private PausableTimestampOracle pausableTSOracle;
70 private CommitTable commitTable;
71
72 @BeforeMethod
73 public void beforeMethod() throws Exception {
74
75 TSOServerConfig tsoConfig = new TSOServerConfig();
76 tsoConfig.setConflictMapSize(1000);
77 tsoConfig.setPort(TSO_SERVER_PORT);
78 tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
79 tsoConfig.setNumConcurrentCTWriters(2);
80 Module tsoServerMockModule = new TSOMockModule(tsoConfig);
81 Injector injector = Guice.createInjector(tsoServerMockModule);
82
83 LOG.info("==================================================================================================");
84 LOG.info("======================================= Init TSO Server ==========================================");
85 LOG.info("==================================================================================================");
86
87 tsoServer = injector.getInstance(TSOServer.class);
88 tsoServer.startAsync();
89 tsoServer.awaitRunning();
90 TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
91
92 LOG.info("==================================================================================================");
93 LOG.info("===================================== TSO Server Initialized =====================================");
94 LOG.info("==================================================================================================");
95
96 pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
97 commitTable = injector.getInstance(CommitTable.class);
98
99 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
100 tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
101
102 this.tsoClientConf = tsoClientConf;
103
104 }
105
106 @AfterMethod
107 public void afterMethod() throws Exception {
108
109
110 tsoServer.stopAsync();
111 tsoServer.awaitTerminated();
112 tsoServer = null;
113 TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
114
115 pausableTSOracle.resume();
116
117 }
118
119
120
121
122
123
124
125
126
127 @Test(timeOut = 30_000)
128 public void testTimeoutsAreCancelled() throws Exception {
129
130 TSOClient client = TSOClient.newInstance(tsoClientConf);
131 int requestTimeoutInMs = 500;
132 int requestMaxRetries = 5;
133 LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
134 Future<Long> f = null;
135 for (int i = 0; i < (requestMaxRetries * 10); i++) {
136 f = client.getNewStartTimestamp();
137 }
138 if (f != null) {
139 f.get();
140 }
141 pausableTSOracle.pause();
142 long msToSleep = ((long) (requestTimeoutInMs * 0.75));
143 LOG.info("Sleeping for {} ms", msToSleep);
144 TimeUnit.MILLISECONDS.sleep(msToSleep);
145 f = client.getNewStartTimestamp();
146 msToSleep = ((long) (requestTimeoutInMs * 0.9));
147 LOG.info("Sleeping for {} ms", msToSleep);
148 TimeUnit.MILLISECONDS.sleep(msToSleep);
149 LOG.info("Resuming");
150 pausableTSOracle.resume();
151 f.get();
152
153 }
154
155 @Test(timeOut = 30_000)
156 public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
157
158 OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
159 testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
160 testTSOClientConf.setRequestMaxRetries(0);
161 TSOClient client = TSOClient.newInstance(testTSOClientConf);
162
163 List<Long> startTimestamps = new ArrayList<>();
164 for (int i = 0; i < 10; i++) {
165 startTimestamps.add(client.getNewStartTimestamp().get());
166 }
167
168 pausableTSOracle.pause();
169
170 List<Future<Long>> futures = new ArrayList<>();
171 for (long s : startTimestamps) {
172 futures.add(client.commit(s, Sets.<CellId>newHashSet()));
173 }
174 TSOClientAccessor.closeChannel(client);
175
176 for (Future<Long> f : futures) {
177 try {
178 f.get();
179 fail("Shouldn't be able to complete");
180 } catch (ExecutionException ee) {
181 assertTrue(ee.getCause() instanceof ServiceUnavailableException,
182 "Should be a service unavailable exception");
183 }
184 }
185 }
186
187
188
189
190 @Test(timeOut = 30_000)
191 public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
192
193 TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
194
195 TSOProto.Request request = TSOProto.Request.newBuilder()
196 .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
197 .build();
198 raw.write(request);
199 try {
200 raw.getResponse().get();
201 fail("Channel should be closed");
202 } catch (ExecutionException ee) {
203 assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
204 }
205 raw.close();
206
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 @Test(timeOut = 30_000)
224 public void testOutOfOrderMessages() throws Exception {
225
226 TSOClient client = TSOClient.newInstance(tsoClientConf);
227 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
228
229 long ts1 = client.getNewStartTimestamp().get();
230
231 TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
232 TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
233 assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
234 assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
235 }
236
237 @Test(timeOut = 30_000)
238 public void testDuplicateCommitAborting() throws Exception {
239
240 TSOClient client = TSOClient.newInstance(tsoClientConf);
241 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
242
243 long ts1 = client.getNewStartTimestamp().get();
244 long ts2 = client.getNewStartTimestamp().get();
245 client.commit(ts2, testWriteSet).get();
246
247 TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
248 TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
249 assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
250 assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
251 }
252
253 @Test(timeOut = 30_000)
254 public void testDuplicateCommit() throws Exception {
255
256 TSOClient client = TSOClient.newInstance(tsoClientConf);
257 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
258
259 long ts1 = client.getNewStartTimestamp().get();
260
261 TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
262 TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
263 if (client.isLowLatency()) {
264 assertTrue(response1.hasCommitResponse());
265 assertTrue(response2.getCommitResponse().getAborted());
266 } else
267 assertEquals(response2.getCommitResponse().getCommitTimestamp(),
268 response1.getCommitResponse().getCommitTimestamp(),
269 "Commit timestamp should be the same");
270 }
271
272
273
274
275
276 @Test(timeOut = 30_000)
277 public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
278
279 TSOClient client = TSOClient.newInstance(tsoClientConf);
280 long ts1 = client.getNewStartTimestamp().get();
281 if(client.isLowLatency())
282 return;
283 pausableTSOracle.pause();
284 TSOFuture<Long> future = client.commit(ts1, testWriteSet);
285 TSOClientAccessor.closeChannel(client);
286 pausableTSOracle.resume();
287 future.get();
288
289 }
290
291 @Test(timeOut = 30_000)
292 public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
293
294 OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
295 testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
296 testTSOClientConf.setRequestTimeoutInMs(100);
297 testTSOClientConf.setRequestMaxRetries(10000);
298 TSOClient client = TSOClient.newInstance(testTSOClientConf);
299
300 long ts1 = client.getNewStartTimestamp().get();
301 pausableTSOracle.pause();
302 TSOFuture<Long> future = client.commit(ts1, testWriteSet);
303 TimeUnit.SECONDS.sleep(1);
304 pausableTSOracle.resume();
305 future.get();
306 }
307
308 @Test(timeOut = 30_000)
309 public void testCommitFailWhenTSOIsDown() throws Exception {
310
311 OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
312 testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
313 testTSOClientConf.setRequestTimeoutInMs(100);
314 testTSOClientConf.setRequestMaxRetries(10);
315 TSOClient client = TSOClient.newInstance(testTSOClientConf);
316
317 long ts1 = client.getNewStartTimestamp().get();
318 pausableTSOracle.pause();
319 TSOFuture<Long> future = client.commit(ts1, testWriteSet);
320 try {
321 future.get();
322 } catch (ExecutionException e) {
323 assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
324 "Should be a ServiceUnavailableExeption");
325 }
326
327 }
328
329 @Test(timeOut = 30_000)
330 public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
331
332 OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
333 testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
334 testTSOClientConf.setRequestTimeoutInMs(100);
335 testTSOClientConf.setRequestMaxRetries(10000);
336 TSOClient client = TSOClient.newInstance(testTSOClientConf);
337
338 pausableTSOracle.pause();
339 Future<Long> future = client.getNewStartTimestamp();
340 TimeUnit.SECONDS.sleep(1);
341 pausableTSOracle.resume();
342 future.get();
343
344 }
345
346
347
348
349
350 @Test(timeOut = 30_000)
351 public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
352
353 TSOClient client = TSOClient.newInstance(tsoClientConf);
354 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
355
356 long tx1ST = client.getNewStartTimestamp().get();
357
358 clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
359 TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
360 if (client.isLowLatency())
361 assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
362 else {
363 assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
364 assertEquals(response.getCommitResponse().getCommitTimestamp(),
365 tx1ST + CommitTable.MAX_CHECKPOINTS_PER_TXN);
366 }
367 }
368
369 @Test(timeOut = 30_000)
370 public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
371
372 TSOClient client = TSOClient.newInstance(tsoClientConf);
373 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
374
375 long tx1ST = client.getNewStartTimestamp().get();
376
377 commitTable.getClient().tryInvalidateTransaction(tx1ST);
378
379 clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
380 TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
381 assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
382 assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
383 }
384
385 @Test(timeOut = 30_000)
386 public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
387
388 TSOClient client = TSOClient.newInstance(tsoClientConf);
389 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
390
391 long tx1ST = client.getNewStartTimestamp().get();
392
393 clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
394
395
396 commitTable.getClient().deleteCommitEntry(tx1ST);
397
398 TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
399 assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
400 assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
401 }
402
403
404
405
406
407
408
409
410
411 private TSOProto.Request createRetryCommitRequest(long ts) {
412 return createCommitRequest(ts, true, testWriteSet);
413 }
414
415 private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
416 TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
417 TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
418 commitBuilder.setStartTimestamp(ts);
419 commitBuilder.setIsRetry(retry);
420 for (CellId cell : writeSet) {
421 commitBuilder.addCellId(cell.getCellId());
422 }
423 return builder.setCommitRequest(commitBuilder.build()).build();
424 }
425
426 }