View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.collect.Lists;
21  import com.google.common.util.concurrent.SettableFuture;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.metrics.NullMetricsProvider;
24  import org.jboss.netty.channel.Channel;
25  import org.mockito.ArgumentCaptor;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  import org.testng.annotations.BeforeMethod;
29  import org.testng.annotations.Test;
30  
31  import java.util.Collections;
32  import java.util.List;
33  
34  import static org.mockito.Matchers.any;
35  import static org.mockito.Matchers.anyLong;
36  import static org.mockito.Matchers.eq;
37  import static org.mockito.Mockito.doReturn;
38  import static org.mockito.Mockito.mock;
39  import static org.mockito.Mockito.timeout;
40  import static org.mockito.Mockito.verify;
41  import static org.testng.Assert.assertTrue;
42  
43  public class TestRequestProcessor {
44  
45      private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
46  
47      private static final int CONFLICT_MAP_SIZE = 1000;
48      private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
49  
50      private MetricsRegistry metrics = new NullMetricsProvider();
51  
52      private PersistenceProcessor persist;
53  
54      private TSOStateManager stateManager;
55  
56      // Request processor under test
57      private RequestProcessor requestProc;
58  
59      @BeforeMethod
60      public void beforeMethod() throws Exception {
61  
62          // Build the required scaffolding for the test
63          MetricsRegistry metrics = new NullMetricsProvider();
64  
65          TimestampOracleImpl timestampOracle =
66                  new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
67  
68          stateManager = new TSOStateManagerImpl(timestampOracle);
69  
70          persist = mock(PersistenceProcessor.class);
71          SettableFuture<Void> f = SettableFuture.create();
72          f.set(null);
73          doReturn(f).when(persist).persistLowWatermark(any(Long.class));
74  
75          TSOServerConfig config = new TSOServerConfig();
76          config.setConflictMapSize(CONFLICT_MAP_SIZE);
77  
78          requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
79  
80          // Initialize the state for the experiment
81          stateManager.register(requestProc);
82          stateManager.initialize();
83  
84      }
85  
86      @Test(timeOut = 30_000)
87      public void testTimestamp() throws Exception {
88  
89          requestProc.timestampRequest(null, new MonitoringContext(metrics));
90          ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
91          verify(persist, timeout(100).times(1)).addTimestampToBatch(
92                  firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
93  
94          long firstTS = firstTScapture.getValue();
95          // verify that timestamps increase monotonically
96          for (int i = 0; i < 100; i++) {
97              requestProc.timestampRequest(null, new MonitoringContext(metrics));
98              verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
99          }
100 
101     }
102 
103     @Test(timeOut = 30_000)
104     public void testCommit() throws Exception {
105 
106         requestProc.timestampRequest(null, new MonitoringContext(metrics));
107         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
108         verify(persist, timeout(100).times(1)).addTimestampToBatch(
109                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
110         long firstTS = TScapture.getValue();
111 
112         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
113         requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
114         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
115 
116         requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
117         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
118 
119         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
120         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
121 
122         // test conflict
123         requestProc.timestampRequest(null, new MonitoringContext(metrics));
124         TScapture = ArgumentCaptor.forClass(Long.class);
125         verify(persist, timeout(100).times(2)).addTimestampToBatch(
126                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
127         long secondTS = TScapture.getValue();
128 
129         requestProc.timestampRequest(null, new MonitoringContext(metrics));
130         TScapture = ArgumentCaptor.forClass(Long.class);
131         verify(persist, timeout(100).times(3)).addTimestampToBatch(
132                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
133         long thirdTS = TScapture.getValue();
134 
135         requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
136         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
137         requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
138         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
139 
140     }
141 
142     @Test(timeOut = 30_000)
143     public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
144 
145         List<Long> writeSet = Collections.emptyList();
146 
147         // Start a transaction...
148         requestProc.timestampRequest(null, new MonitoringContext(metrics));
149         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
150         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
151                                                                    any(Channel.class),
152                                                                    any(MonitoringContext.class));
153         long startTS = capturedTS.getValue();
154 
155         // ... simulate the reset of the RequestProcessor state (e.g. due to
156         // a change in mastership) and...
157         stateManager.initialize();
158 
159         // ...check that the transaction is aborted when trying to commit
160         requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
161         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
162 
163     }
164 
165     @Test(timeOut = 5_000)
166     public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
167 
168         final int ANY_START_TS = 1;
169         final long FIRST_COMMIT_TS_EVICTED = 1L;
170         final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
171 
172         // Fill the cache to provoke a cache eviction
173         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
174             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
175             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
176             requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics));
177         }
178 
179         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
180 
181         // Check that first time its called is on init
182         verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
183         // Then, check it is called when cache is full and the first element is evicted (should be a 1)
184         verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
185         // Finally it should never be called with the next element
186         verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
187 
188     }
189 
190 }