View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.collect.Sets;
21  import com.google.inject.Guice;
22  import com.google.inject.Injector;
23  import com.google.inject.Module;
24  import org.apache.omid.TestUtils;
25  import org.apache.omid.committable.CommitTable;
26  import org.apache.omid.tso.TSOMockModule;
27  import org.apache.omid.tso.TSOServer;
28  import org.apache.omid.tso.TSOServerConfig;
29  import org.apache.omid.tso.util.DummyCellIdImpl;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  import org.testng.Assert;
33  import org.testng.annotations.AfterClass;
34  import org.testng.annotations.BeforeClass;
35  import org.testng.annotations.Test;
36  
37  import java.util.HashSet;
38  import java.util.Set;
39  import java.util.concurrent.ExecutionException;
40  
41  import static org.testng.Assert.assertEquals;
42  import static org.testng.Assert.assertFalse;
43  import static org.testng.Assert.assertNotNull;
44  import static org.testng.Assert.assertTrue;
45  
46  public class TestIntegrationOfTSOClientServerBasicFunctionality {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
49  
50      private static final String TSO_SERVER_HOST = "localhost";
51      private int tsoServerPortForTest;
52  
53      // Cells for tests
54      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
55      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
56  
57      // Required infrastructure for TSO tsoClient-server integration testing
58      private TSOServer tsoServer;
59      private TSOClient tsoClient;
60      private TSOClient justAnotherTSOClient;
61      private CommitTable.Client commitTableClient;
62  
63      @BeforeClass
64      public void setup() throws Exception {
65  
66          tsoServerPortForTest = TestUtils.getFreeLocalPort();
67  
68          TSOServerConfig tsoConfig = new TSOServerConfig();
69          tsoConfig.setMaxItems(1000);
70          tsoConfig.setPort(tsoServerPortForTest);
71          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
72          Injector injector = Guice.createInjector(tsoServerMockModule);
73  
74          CommitTable commitTable = injector.getInstance(CommitTable.class);
75          commitTableClient = commitTable.getClient();
76  
77          LOG.info("==================================================================================================");
78          LOG.info("======================================= Init TSO Server ==========================================");
79          LOG.info("==================================================================================================");
80  
81          tsoServer = injector.getInstance(TSOServer.class);
82          tsoServer.startAndWait();
83          TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
84  
85          LOG.info("==================================================================================================");
86          LOG.info("===================================== TSO Server Initialized =====================================");
87          LOG.info("==================================================================================================");
88  
89          LOG.info("==================================================================================================");
90          LOG.info("======================================= Setup TSO Clients ========================================");
91          LOG.info("==================================================================================================");
92  
93          // Configure direct connection to the server
94          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
95          tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
96  
97          tsoClient = TSOClient.newInstance(tsoClientConf);
98          justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
99  
100         LOG.info("==================================================================================================");
101         LOG.info("===================================== TSO Clients Initialized ====================================");
102         LOG.info("==================================================================================================");
103 
104         Thread.currentThread().setName("Test Thread");
105 
106     }
107 
108     @AfterClass
109     public void tearDown() throws Exception {
110 
111         tsoClient.close().get();
112 
113         tsoServer.stopAndWait();
114         tsoServer = null;
115         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
116 
117     }
118 
119     @Test(timeOut = 30_000)
120     public void testTimestampsOrderingGrowMonotonically() throws Exception {
121         long referenceTimestamp;
122         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
123         referenceTimestamp = startTsTx1;
124 
125         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
126         assertEquals(startTsTx2, ++referenceTimestamp, "Should grow monotonically");
127         assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
128 
129         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
130         assertEquals(commitTsTx2, ++referenceTimestamp, "Should grow monotonically");
131 
132         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
133         assertEquals(commitTsTx1, ++referenceTimestamp, "Should grow monotonically");
134 
135         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
136         assertEquals(startTsTx3, ++referenceTimestamp, "Should grow monotonically");
137     }
138 
139     @Test(timeOut = 30_000)
140     public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
141         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
142         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
143         assertTrue(commitTsTx1 > startTsTx1);
144     }
145 
146     @Test(timeOut = 30_000)
147     public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
148         long startTs = tsoClient.getNewStartTimestamp().get();
149 
150         Set<CellId> cells = new HashSet<>();
151         for (int i = 0; i < 1_000_000; i++) {
152             cells.add(new DummyCellIdImpl(i));
153         }
154 
155         long commitTs = tsoClient.commit(startTs, cells).get();
156         assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
157     }
158 
159     @Test(timeOut = 30_000)
160     public void testMultipleSerialCommitsDoNotConflict() throws Exception {
161         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
162         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
163         assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
164 
165         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
166         assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
167 
168         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
169         assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
170 
171         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
172         long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
173         assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
174     }
175 
176     @Test(timeOut = 30_000)
177     public void testCommitWritesToCommitTable() throws Exception {
178         long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
179         long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
180         assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
181 
182         assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
183                 "Commit TS for Tx1 shouldn't appear in Commit Table");
184 
185         long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
186         assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
187 
188         Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
189         assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
190         assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
191                 "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
192         assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
193     }
194 
195     @Test(timeOut = 30_000)
196     public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
197         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
198         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
199         assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
200 
201         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
202         assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
203 
204         try {
205             tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
206             Assert.fail("Second TX should fail on commit");
207         } catch (ExecutionException ee) {
208             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
209         }
210     }
211 
212     @Test(timeOut = 30_000)
213     public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
214         long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
215         long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
216         long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
217 
218         tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
219         try {
220             tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
221             Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
222         } catch (ExecutionException ee) {
223             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
224         }
225         long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
226 
227         assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
228         long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
229         assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
230         assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
231     }
232 
233 }