View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
22  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
23  
24  import org.apache.omid.committable.CommitTable;
25  import org.apache.omid.metrics.MetricsRegistry;
26  import org.apache.omid.metrics.NullMetricsProvider;
27  import org.jboss.netty.channel.Channel;
28  import org.mockito.ArgumentCaptor;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.testng.annotations.BeforeMethod;
32  import org.testng.annotations.Test;
33  
34  import java.util.ArrayList;
35  import java.util.Collections;
36  import java.util.List;
37  
38  import static org.mockito.Matchers.any;
39  import static org.mockito.Matchers.anyLong;
40  import static org.mockito.Matchers.eq;
41  import static org.mockito.Mockito.doReturn;
42  import static org.mockito.Mockito.mock;
43  import static org.mockito.Mockito.timeout;
44  import static org.mockito.Mockito.verify;
45  import static org.testng.Assert.assertTrue;
46  
47  public class TestRequestProcessor {
48  
49      private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
50  
51      private static final int CONFLICT_MAP_SIZE = 1000;
52      private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
53  
54      private MetricsRegistry metrics = new NullMetricsProvider();
55  
56      private PersistenceProcessor persist;
57  
58      private TSOStateManager stateManager;
59  
60      // Request processor under test
61      private RequestProcessor requestProc;
62  
63      private LowWatermarkWriter lowWatermarkWriter;
64      private TimestampOracleImpl timestampOracle;
65      private ReplyProcessor replyProcessor;
66  
67      @BeforeMethod
68      public void beforeMethod() throws Exception {
69  
70          // Build the required scaffolding for the test
71          MetricsRegistry metrics = new NullMetricsProvider();
72  
73          TimestampOracleImpl timestampOracle =
74                  new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
75  
76          stateManager = new TSOStateManagerImpl(timestampOracle);
77          lowWatermarkWriter = mock(LowWatermarkWriter.class);
78          persist = mock(PersistenceProcessor.class);
79          replyProcessor = mock(ReplyProcessor.class);
80          SettableFuture<Void> f = SettableFuture.create();
81          f.set(null);
82          doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
83  
84          TSOServerConfig config = new TSOServerConfig();
85          config.setConflictMapSize(CONFLICT_MAP_SIZE);
86  
87          requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(),
88                  config, lowWatermarkWriter,replyProcessor);
89  
90          // Initialize the state for the experiment
91          stateManager.register(requestProc);
92          stateManager.initialize();
93  
94      }
95  
96      @Test(timeOut = 30_000)
97      public void testTimestamp() throws Exception {
98  
99          requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
100         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
101         verify(persist, timeout(100).times(1)).addTimestampToBatch(
102                 firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
103 
104         long firstTS = firstTScapture.getValue();
105         // verify that timestamps increase monotonically
106         for (int i = 0; i < 100; i++) {
107             requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
108             verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
109             firstTS += CommitTable.MAX_CHECKPOINTS_PER_TXN;
110         }
111 
112     }
113 
114     @Test(timeOut = 30_000)
115     public void testCommit() throws Exception {
116 
117         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
118         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
119         verify(persist, timeout(100).times(1)).addTimestampToBatch(
120                 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
121         long firstTS = TScapture.getValue();
122 
123         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
124         requestProc.commitRequest(firstTS - CommitTable.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
125         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - CommitTable.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
126 
127         requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
128         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
129 
130         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class), any(Optional.class));
131         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
132 
133         // test conflict
134         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
135         TScapture = ArgumentCaptor.forClass(Long.class);
136         verify(persist, timeout(100).times(2)).addTimestampToBatch(
137                 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
138         long secondTS = TScapture.getValue();
139 
140         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
141         TScapture = ArgumentCaptor.forClass(Long.class);
142         verify(persist, timeout(100).times(3)).addTimestampToBatch(
143                 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
144         long thirdTS = TScapture.getValue();
145 
146         requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
147         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
148         requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
149         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
150 
151     }
152 
153     @Test(timeOut = 30_000)
154     public void testFence() {
155 
156         requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
157         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
158         verify(replyProcessor, timeout(100).times(1)).sendFenceResponse(eq(666L),
159                 firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
160 
161     }
162 
163     @Test(timeOut = 30_000)
164     public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
165 
166         List<Long> writeSet = Collections.emptyList();
167 
168         // Start a transaction...
169         requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
170         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
171         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
172                                                                    any(Channel.class),
173                                                                    any(MonitoringContextImpl.class));
174         long startTS = capturedTS.getValue();
175 
176         // ... simulate the reset of the RequestProcessor state (e.g. due to
177         // a change in mastership) and...
178         stateManager.initialize();
179 
180         // ...check that the transaction is aborted when trying to commit
181         requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
182         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
183 
184     }
185 
186     @Test(timeOut=5_000)
187     public void testLowWaterIsForwardedWhenACacheElementIsEvicted() throws Exception {
188         final long ANY_START_TS = 1;
189         final long FIRST_COMMIT_TS_EVICTED = CommitTable.MAX_CHECKPOINTS_PER_TXN;
190         final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + CommitTable.MAX_CHECKPOINTS_PER_TXN;
191 
192         // Fill the cache to provoke a cache eviction
193         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
194             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
195             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
196             requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
197         }
198 
199         Thread.sleep(3000); // Allow the Request processor to finish the request processing
200 
201         // Check that first time its called is on init
202         verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
203         // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
204         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(FIRST_COMMIT_TS_EVICTED)));
205         // Finally it should never be called with the next element
206         verify(persist, timeout(100).never()).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED)));
207 
208 
209     }
210 }