View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.collect.Lists;
21  import com.google.common.util.concurrent.SettableFuture;
22  
23  import org.apache.omid.metrics.MetricsRegistry;
24  import org.apache.omid.metrics.NullMetricsProvider;
25  import org.apache.omid.transaction.AbstractTransactionManager;
26  import org.jboss.netty.channel.Channel;
27  import org.mockito.ArgumentCaptor;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.testng.annotations.BeforeMethod;
31  import org.testng.annotations.Test;
32  
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.List;
36  
37  import static org.mockito.Matchers.any;
38  import static org.mockito.Matchers.anyLong;
39  import static org.mockito.Matchers.eq;
40  import static org.mockito.Mockito.doReturn;
41  import static org.mockito.Mockito.mock;
42  import static org.mockito.Mockito.timeout;
43  import static org.mockito.Mockito.verify;
44  import static org.testng.Assert.assertTrue;
45  
46  public class TestRequestProcessor {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
49  
50      private static final int CONFLICT_MAP_SIZE = 1000;
51      private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
52  
53      private MetricsRegistry metrics = new NullMetricsProvider();
54  
55      private PersistenceProcessor persist;
56  
57      private TSOStateManager stateManager;
58  
59      // Request processor under test
60      private RequestProcessor requestProc;
61  
62      private LowWatermarkWriter lowWatermarkWriter;
63      private TimestampOracleImpl timestampOracle;
64      private ReplyProcessor replyProcessor;
65  
66      @BeforeMethod
67      public void beforeMethod() throws Exception {
68  
69          // Build the required scaffolding for the test
70          MetricsRegistry metrics = new NullMetricsProvider();
71  
72          TimestampOracleImpl timestampOracle =
73                  new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
74  
75          stateManager = new TSOStateManagerImpl(timestampOracle);
76          lowWatermarkWriter = mock(LowWatermarkWriter.class);
77          persist = mock(PersistenceProcessor.class);
78          replyProcessor = mock(ReplyProcessor.class);
79          SettableFuture<Void> f = SettableFuture.create();
80          f.set(null);
81          doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
82  
83          TSOServerConfig config = new TSOServerConfig();
84          config.setConflictMapSize(CONFLICT_MAP_SIZE);
85  
86          requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(),
87                  config, lowWatermarkWriter,replyProcessor);
88  
89          // Initialize the state for the experiment
90          stateManager.register(requestProc);
91          stateManager.initialize();
92  
93      }
94  
95      @Test(timeOut = 30_000)
96      public void testTimestamp() throws Exception {
97  
98          requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
99          ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
100         verify(persist, timeout(100).times(1)).addTimestampToBatch(
101                 firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
102 
103         long firstTS = firstTScapture.getValue();
104         // verify that timestamps increase monotonically
105         for (int i = 0; i < 100; i++) {
106             requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
107             verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
108             firstTS += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
109         }
110 
111     }
112 
113     @Test(timeOut = 30_000)
114     public void testCommit() throws Exception {
115 
116         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
117         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
118         verify(persist, timeout(100).times(1)).addTimestampToBatch(
119                 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
120         long firstTS = TScapture.getValue();
121 
122         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
123         requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
124         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
125 
126         requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
127         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
128 
129         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
130         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
131 
132         // test conflict
133         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
134         TScapture = ArgumentCaptor.forClass(Long.class);
135         verify(persist, timeout(100).times(2)).addTimestampToBatch(
136                 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
137         long secondTS = TScapture.getValue();
138 
139         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
140         TScapture = ArgumentCaptor.forClass(Long.class);
141         verify(persist, timeout(100).times(3)).addTimestampToBatch(
142                 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
143         long thirdTS = TScapture.getValue();
144 
145         requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
146         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
147         requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
148         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
149 
150     }
151 
152     @Test(timeOut = 30_000)
153     public void testFence() throws Exception {
154 
155         requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
156         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
157         verify(replyProcessor, timeout(100).times(1)).sendFenceResponse(eq(666L),
158                 firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
159 
160     }
161 
162     @Test(timeOut = 30_000)
163     public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
164 
165         List<Long> writeSet = Collections.emptyList();
166 
167         // Start a transaction...
168         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
169         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
170         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
171                                                                    any(Channel.class),
172                                                                    any(MonitoringContextImpl.class));
173         long startTS = capturedTS.getValue();
174 
175         // ... simulate the reset of the RequestProcessor state (e.g. due to
176         // a change in mastership) and...
177         stateManager.initialize();
178 
179         // ...check that the transaction is aborted when trying to commit
180         requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
181         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
182 
183     }
184 
185     @Test(timeOut = 5_000)
186     public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
187 
188         final int ANY_START_TS = 1;
189         final long FIRST_COMMIT_TS_EVICTED = AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
190         final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
191 
192         // Fill the cache to provoke a cache eviction
193         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
194             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
195             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
196             requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
197         }
198 
199         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
200 
201         // Check that first time its called is on init
202         verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
203         // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
204         verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
205         // Finally it should never be called with the next element
206         verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
207 
208     }
209 
210 }