1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
22 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
23
24 import org.apache.omid.committable.CommitTable;
25 import org.apache.omid.metrics.MetricsRegistry;
26 import org.apache.omid.metrics.NullMetricsProvider;
27 import org.jboss.netty.channel.Channel;
28 import org.mockito.ArgumentCaptor;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.testng.annotations.BeforeMethod;
32 import org.testng.annotations.Test;
33
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.List;
37
38 import static org.mockito.Matchers.any;
39 import static org.mockito.Matchers.anyLong;
40 import static org.mockito.Matchers.eq;
41 import static org.mockito.Mockito.doReturn;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.timeout;
44 import static org.mockito.Mockito.verify;
45 import static org.testng.Assert.assertTrue;
46
47 public class TestRequestProcessor {
48
49 private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
50
51 private static final int CONFLICT_MAP_SIZE = 1000;
52 private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
53
54 private MetricsRegistry metrics = new NullMetricsProvider();
55
56 private PersistenceProcessor persist;
57
58 private TSOStateManager stateManager;
59
60
61 private RequestProcessor requestProc;
62
63 private LowWatermarkWriter lowWatermarkWriter;
64 private TimestampOracleImpl timestampOracle;
65 private ReplyProcessor replyProcessor;
66
67 @BeforeMethod
68 public void beforeMethod() throws Exception {
69
70
71 MetricsRegistry metrics = new NullMetricsProvider();
72
73 TimestampOracleImpl timestampOracle =
74 new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
75
76 stateManager = new TSOStateManagerImpl(timestampOracle);
77 lowWatermarkWriter = mock(LowWatermarkWriter.class);
78 persist = mock(PersistenceProcessor.class);
79 replyProcessor = mock(ReplyProcessor.class);
80 SettableFuture<Void> f = SettableFuture.create();
81 f.set(null);
82 doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
83
84 TSOServerConfig config = new TSOServerConfig();
85 config.setConflictMapSize(CONFLICT_MAP_SIZE);
86
87 requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(),
88 config, lowWatermarkWriter,replyProcessor);
89
90
91 stateManager.register(requestProc);
92 stateManager.initialize();
93
94 }
95
96 @Test(timeOut = 30_000)
97 public void testTimestamp() throws Exception {
98
99 requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
100 ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
101 verify(persist, timeout(100).times(1)).addTimestampToBatch(
102 firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
103
104 long firstTS = firstTScapture.getValue();
105
106 for (int i = 0; i < 100; i++) {
107 requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
108 verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
109 firstTS += CommitTable.MAX_CHECKPOINTS_PER_TXN;
110 }
111
112 }
113
114 @Test(timeOut = 30_000)
115 public void testCommit() throws Exception {
116
117 requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
118 ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
119 verify(persist, timeout(100).times(1)).addTimestampToBatch(
120 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
121 long firstTS = TScapture.getValue();
122
123 List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
124 requestProc.commitRequest(firstTS - CommitTable.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
125 verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - CommitTable.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
126
127 requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
128 ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
129
130 verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class), any(Optional.class));
131 assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
132
133
134 requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
135 TScapture = ArgumentCaptor.forClass(Long.class);
136 verify(persist, timeout(100).times(2)).addTimestampToBatch(
137 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
138 long secondTS = TScapture.getValue();
139
140 requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
141 TScapture = ArgumentCaptor.forClass(Long.class);
142 verify(persist, timeout(100).times(3)).addTimestampToBatch(
143 TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
144 long thirdTS = TScapture.getValue();
145
146 requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
147 verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
148 requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
149 verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
150
151 }
152
153 @Test(timeOut = 30_000)
154 public void testFence() {
155
156 requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
157 ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
158 verify(replyProcessor, timeout(100).times(1)).sendFenceResponse(eq(666L),
159 firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
160
161 }
162
163 @Test(timeOut = 30_000)
164 public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
165
166 List<Long> writeSet = Collections.emptyList();
167
168
169 requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
170 ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
171 verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
172 any(Channel.class),
173 any(MonitoringContextImpl.class));
174 long startTS = capturedTS.getValue();
175
176
177
178 stateManager.initialize();
179
180
181 requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
182 verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
183
184 }
185
186 @Test(timeOut=5_000)
187 public void testLowWaterIsForwardedWhenACacheElementIsEvicted() throws Exception {
188 final long ANY_START_TS = 1;
189 final long FIRST_COMMIT_TS_EVICTED = CommitTable.MAX_CHECKPOINTS_PER_TXN;
190 final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + CommitTable.MAX_CHECKPOINTS_PER_TXN;
191
192
193 for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
194 long writeSetElementHash = i + 1;
195 List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
196 requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
197 }
198
199 Thread.sleep(3000);
200
201
202 verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
203
204 verify(persist, timeout(100).times(1)).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(FIRST_COMMIT_TS_EVICTED)));
205
206 verify(persist, timeout(100).never()).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED)));
207
208
209 }
210 }