View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.collect.Sets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import com.google.inject.Module;
24  
25  import org.apache.omid.TestUtils;
26  import org.apache.omid.committable.CommitTable;
27  import org.apache.omid.transaction.AbstractTransactionManager;
28  import org.apache.omid.tso.TSOMockModule;
29  import org.apache.omid.tso.TSOServer;
30  import org.apache.omid.tso.TSOServerConfig;
31  import org.apache.omid.tso.util.DummyCellIdImpl;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  import org.testng.Assert;
35  import org.testng.annotations.AfterClass;
36  import org.testng.annotations.BeforeClass;
37  import org.testng.annotations.Test;
38  
39  import java.util.HashSet;
40  import java.util.Set;
41  import java.util.concurrent.ExecutionException;
42  
43  import static org.testng.Assert.assertEquals;
44  import static org.testng.Assert.assertFalse;
45  import static org.testng.Assert.assertNotNull;
46  import static org.testng.Assert.assertTrue;
47  
48  public class TestIntegrationOfTSOClientServerBasicFunctionality {
49  
50      private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
51  
52      private static final String TSO_SERVER_HOST = "localhost";
53      private int tsoServerPortForTest;
54  
55      // Cells for tests
56      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
57      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
58  
59      // Required infrastructure for TSO tsoClient-server integration testing
60      private TSOServer tsoServer;
61      private TSOClient tsoClient;
62      private TSOClient justAnotherTSOClient;
63      private CommitTable.Client commitTableClient;
64  
65      @BeforeClass
66      public void setup() throws Exception {
67  
68          tsoServerPortForTest = TestUtils.getFreeLocalPort();
69  
70          TSOServerConfig tsoConfig = new TSOServerConfig();
71          tsoConfig.setConflictMapSize(1000);
72          tsoConfig.setPort(tsoServerPortForTest);
73          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
74          Injector injector = Guice.createInjector(tsoServerMockModule);
75  
76          CommitTable commitTable = injector.getInstance(CommitTable.class);
77          commitTableClient = commitTable.getClient();
78  
79          LOG.info("==================================================================================================");
80          LOG.info("======================================= Init TSO Server ==========================================");
81          LOG.info("==================================================================================================");
82  
83          tsoServer = injector.getInstance(TSOServer.class);
84          tsoServer.startAndWait();
85          TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
86  
87          LOG.info("==================================================================================================");
88          LOG.info("===================================== TSO Server Initialized =====================================");
89          LOG.info("==================================================================================================");
90  
91          LOG.info("==================================================================================================");
92          LOG.info("======================================= Setup TSO Clients ========================================");
93          LOG.info("==================================================================================================");
94  
95          // Configure direct connection to the server
96          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
97          tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
98  
99          tsoClient = TSOClient.newInstance(tsoClientConf);
100         justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
101 
102         LOG.info("==================================================================================================");
103         LOG.info("===================================== TSO Clients Initialized ====================================");
104         LOG.info("==================================================================================================");
105 
106         Thread.currentThread().setName("Test Thread");
107 
108     }
109 
110     @AfterClass
111     public void tearDown() throws Exception {
112 
113         tsoClient.close().get();
114 
115         tsoServer.stopAndWait();
116         tsoServer = null;
117         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
118 
119     }
120 
121     @Test(timeOut = 30_000)
122     public void testTimestampsOrderingGrowMonotonically() throws Exception {
123         long referenceTimestamp;
124         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
125         referenceTimestamp = startTsTx1;
126 
127         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
128         referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
129         assertEquals(startTsTx2, referenceTimestamp, "Should grow monotonically");
130         assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
131 
132         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
133         referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
134         assertEquals(commitTsTx2, referenceTimestamp, "Should grow monotonically");
135 
136         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
137         referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
138         assertEquals(commitTsTx1, referenceTimestamp, "Should grow monotonically");
139 
140         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
141         referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
142         assertEquals(startTsTx3, referenceTimestamp, "Should grow monotonically");
143     }
144 
145     @Test(timeOut = 30_000)
146     public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
147         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
148         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
149         assertTrue(commitTsTx1 > startTsTx1);
150     }
151 
152     @Test(timeOut = 30_000)
153     public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
154         long startTs = tsoClient.getNewStartTimestamp().get();
155 
156         Set<CellId> cells = new HashSet<>();
157         for (int i = 0; i < 1_000_000; i++) {
158             cells.add(new DummyCellIdImpl(i));
159         }
160 
161         long commitTs = tsoClient.commit(startTs, cells).get();
162         assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
163     }
164 
165     @Test(timeOut = 30_000)
166     public void testMultipleSerialCommitsDoNotConflict() throws Exception {
167         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
168         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
169         assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
170 
171         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
172         assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
173 
174         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
175         assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
176 
177         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
178         long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
179         assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
180     }
181 
182     @Test(timeOut = 30_000)
183     public void testCommitWritesToCommitTable() throws Exception {
184 
185         long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
186         long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
187         assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
188 
189         if (!tsoClient.isLowLatency())
190             assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
191                     "Commit TS for Tx1 shouldn't appear in Commit Table");
192 
193         long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
194         assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
195 
196         if (!tsoClient.isLowLatency()) {
197             Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
198             assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
199             assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
200                     "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
201             assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
202         } else {
203             assertTrue(commitTsForTx1 > startTsForTx2, "Commit TS should be higher than tx's Start TS");
204         }
205 
206 
207     }
208 
209     @Test(timeOut = 30_000)
210     public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
211         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
212         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
213         assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
214 
215         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
216         assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
217 
218         try {
219             tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
220             Assert.fail("Second TX should fail on commit");
221         } catch (ExecutionException ee) {
222             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
223         }
224     }
225 
226     @Test(timeOut = 30_000)
227     public void testTransactionStartedBeforeFenceAborts() throws Exception {
228 
229         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
230 
231         long fenceID = tsoClient.getFence(c1.getTableId()).get();
232 
233         assertTrue(fenceID > startTsTx1, "Fence ID should be higher thank Tx1ID");
234 
235         try {
236             tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
237             Assert.fail("TX should fail on commit");
238         } catch (ExecutionException ee) {
239             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
240         }
241     }
242 
243     @Test(timeOut = 30_000)
244     public void testTransactionStartedBeforeNonOverlapFenceCommits() throws Exception {
245 
246         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
247 
248         tsoClient.getFence(7).get();
249 
250         try {
251             tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
252         } catch (ExecutionException ee) {
253             Assert.fail("TX should successfully commit");        }
254     }
255 
256     @Test(timeOut = 30_000)
257     public void testTransactionStartedAfterFenceCommits() throws Exception {
258 
259         tsoClient.getFence(c1.getTableId()).get();
260 
261         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
262 
263         try {
264             tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
265         } catch (ExecutionException ee) {
266             Assert.fail("TX should successfully commit");
267         }
268     }
269 
270     @Test(timeOut = 30_000)
271     public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
272         long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
273         long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
274         long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
275 
276         Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
277         try {
278             tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
279             Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
280         } catch (ExecutionException ee) {
281             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
282         }
283         long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
284 
285         assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
286         if (!tsoClient.isLowLatency())
287             commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
288         assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
289         assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
290     }
291 
292 }