View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.collect.Lists;
21  import com.google.common.util.concurrent.SettableFuture;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.metrics.NullMetricsProvider;
24  import org.jboss.netty.channel.Channel;
25  import org.mockito.ArgumentCaptor;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  import org.testng.annotations.BeforeMethod;
29  import org.testng.annotations.Test;
30  
31  import java.util.Collections;
32  import java.util.List;
33  
34  import static org.mockito.Matchers.any;
35  import static org.mockito.Matchers.anyBoolean;
36  import static org.mockito.Matchers.anyLong;
37  import static org.mockito.Matchers.eq;
38  import static org.mockito.Mockito.doReturn;
39  import static org.mockito.Mockito.mock;
40  import static org.mockito.Mockito.timeout;
41  import static org.mockito.Mockito.verify;
42  import static org.testng.Assert.assertTrue;
43  
44  public class TestRequestProcessor {
45  
46      private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
47  
48      private static final int CONFLICT_MAP_SIZE = 1000;
49      private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
50  
51      private MetricsRegistry metrics = new NullMetricsProvider();
52  
53      private PersistenceProcessor persist;
54  
55      private TSOStateManager stateManager;
56  
57      // Request processor under test
58      private RequestProcessor requestProc;
59  
60      @BeforeMethod
61      public void beforeMethod() throws Exception {
62  
63          // Build the required scaffolding for the test
64          MetricsRegistry metrics = new NullMetricsProvider();
65  
66          TimestampOracleImpl timestampOracle =
67                  new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
68  
69          stateManager = new TSOStateManagerImpl(timestampOracle);
70  
71          persist = mock(PersistenceProcessor.class);
72          SettableFuture<Void> f = SettableFuture.create();
73          f.set(null);
74          doReturn(f).when(persist).persistLowWatermark(any(Long.class));
75  
76          TSOServerConfig config = new TSOServerConfig();
77          config.setMaxItems(CONFLICT_MAP_SIZE);
78  
79          requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
80  
81          // Initialize the state for the experiment
82          stateManager.register(requestProc);
83          stateManager.initialize();
84  
85      }
86  
87      @Test(timeOut = 30_000)
88      public void testTimestamp() throws Exception {
89  
90          requestProc.timestampRequest(null, new MonitoringContext(metrics));
91          ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
92          verify(persist, timeout(100).times(1)).addTimestampToBatch(
93                  firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
94  
95          long firstTS = firstTScapture.getValue();
96          // verify that timestamps increase monotonically
97          for (int i = 0; i < 100; i++) {
98              requestProc.timestampRequest(null, new MonitoringContext(metrics));
99              verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
100         }
101 
102     }
103 
104     @Test(timeOut = 30_000)
105     public void testCommit() throws Exception {
106 
107         requestProc.timestampRequest(null, new MonitoringContext(metrics));
108         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
109         verify(persist, timeout(100).times(1)).addTimestampToBatch(
110                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
111         long firstTS = TScapture.getValue();
112 
113         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
114         requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
115         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
116 
117         requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
118         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
119 
120         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
121         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
122 
123         // test conflict
124         requestProc.timestampRequest(null, new MonitoringContext(metrics));
125         TScapture = ArgumentCaptor.forClass(Long.class);
126         verify(persist, timeout(100).times(2)).addTimestampToBatch(
127                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
128         long secondTS = TScapture.getValue();
129 
130         requestProc.timestampRequest(null, new MonitoringContext(metrics));
131         TScapture = ArgumentCaptor.forClass(Long.class);
132         verify(persist, timeout(100).times(3)).addTimestampToBatch(
133                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
134         long thirdTS = TScapture.getValue();
135 
136         requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
137         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
138         requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
139         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
140 
141     }
142 
143     @Test(timeOut = 30_000)
144     public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
145 
146         List<Long> writeSet = Collections.emptyList();
147 
148         // Start a transaction...
149         requestProc.timestampRequest(null, new MonitoringContext(metrics));
150         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
151         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
152                                                                    any(Channel.class),
153                                                                    any(MonitoringContext.class));
154         long startTS = capturedTS.getValue();
155 
156         // ... simulate the reset of the RequestProcessor state (e.g. due to
157         // a change in mastership) and...
158         stateManager.initialize();
159 
160         // ...check that the transaction is aborted when trying to commit
161         requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
162         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
163 
164     }
165 
166     @Test(timeOut = 5_000)
167     public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
168 
169         final int ANY_START_TS = 1;
170         final long FIRST_COMMIT_TS_EVICTED = 1L;
171         final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
172 
173         // Fill the cache to provoke a cache eviction
174         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
175             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
176             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
177             requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics));
178         }
179 
180         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
181 
182         // Check that first time its called is on init
183         verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
184         // Then, check it is called when cache is full and the first element is evicted (should be a 1)
185         verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
186         // Finally it should never be called with the next element
187         verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
188 
189     }
190 
191 }