1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.omid.committable.CommitTable;
22 import org.apache.omid.metrics.MetricsRegistry;
23 import org.apache.omid.metrics.NullMetricsProvider;
24 import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
25 import org.jboss.netty.channel.Channel;
26 import org.mockito.Mock;
27 import org.mockito.Mockito;
28 import org.mockito.MockitoAnnotations;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.testng.annotations.AfterMethod;
32 import org.testng.annotations.BeforeMethod;
33 import org.testng.annotations.Test;
34
35 import java.io.IOException;
36
37 import static org.mockito.Matchers.any;
38 import static org.mockito.Matchers.anyLong;
39 import static org.mockito.Matchers.eq;
40 import static org.mockito.Mockito.doReturn;
41 import static org.mockito.Mockito.doThrow;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.never;
44 import static org.mockito.Mockito.spy;
45 import static org.mockito.Mockito.times;
46 import static org.mockito.Mockito.verify;
47 import static org.testng.Assert.assertEquals;
48 import static org.testng.Assert.assertTrue;
49 import static org.testng.Assert.fail;
50
51 public class TestPersistenceProcessorHandler {
52
53 private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessorHandler.class);
54
55 private static final int BATCH_ID = 0;
56 private static final int BATCH_SIZE = 6;
57 private static final long BATCH_SEQUENCE = 0;
58
59 private static final long FIRST_ST = 0L;
60 private static final long FIRST_CT = 1L;
61 private static final long SECOND_ST = 2L;
62 private static final long SECOND_CT = 3L;
63 private static final long THIRD_ST = 4L;
64 private static final long THIRD_CT = 5L;
65 private static final long FOURTH_ST = 6L;
66 private static final long FOURTH_CT = 7L;
67 private static final long FIFTH_ST = 8L;
68 private static final long FIFTH_CT = 9L;
69 private static final long SIXTH_ST = 10L;
70
71 @Mock
72 private CommitTable.Writer mockWriter;
73 @Mock
74 private CommitTable.Client mockClient;
75 @Mock
76 private LeaseManager leaseManager;
77 @Mock
78 private ReplyProcessor replyProcessor;
79 @Mock
80 private RetryProcessor retryProcessor;
81 @Mock
82 private Panicker panicker;
83
84 private CommitTable commitTable;
85
86 private MetricsRegistry metrics;
87
88
89 private PersistenceProcessorHandler persistenceHandler;
90
91 @BeforeMethod(alwaysRun = true, timeOut = 30_000)
92 public void initMocksAndComponents() throws Exception {
93
94 MockitoAnnotations.initMocks(this);
95
96
97 metrics = new NullMetricsProvider();
98
99
100 commitTable = new CommitTable() {
101 @Override
102 public Writer getWriter() {
103 return mockWriter;
104 }
105
106 @Override
107 public Client getClient() {
108 return mockClient;
109 }
110 };
111
112
113 doReturn(true).when(leaseManager).stillInLeasePeriod();
114
115 persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
116 "localhost:1234",
117 leaseManager,
118 commitTable,
119 replyProcessor,
120 retryProcessor,
121 panicker));
122
123 }
124
125 @AfterMethod
126 void afterMethod() {
127 Mockito.reset(mockWriter);
128 }
129
130 @Test(timeOut = 1_000)
131 public void testPersistentProcessorHandlerIdsAreCreatedConsecutive() throws Exception {
132
133 TSOServerConfig tsoConfig = new TSOServerConfig();
134 tsoConfig.setNumConcurrentCTWriters(32);
135
136 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
137 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
138 handlers[i] = new PersistenceProcessorHandler(metrics,
139 "localhost:1234",
140 mock(LeaseManager.class),
141 commitTable,
142 mock(ReplyProcessor.class),
143 retryProcessor,
144 panicker);
145 }
146
147 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
148
149 if (i + 1 < tsoConfig.getNumConcurrentCTWriters()) {
150 int followingHandlerIdAsInt = Integer.valueOf(handlers[i + 1].getId());
151 assertEquals(handlers[i].getId(), String.valueOf(followingHandlerIdAsInt - 1));
152 } else {
153 int followingHandlerIdAsInt = PersistenceProcessorHandler.consecutiveSequenceCreator.get();
154 assertEquals(handlers[i].getId(), String.valueOf(followingHandlerIdAsInt - 1));
155 }
156 }
157
158 }
159
160 @Test(timeOut = 10_000)
161 public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
162
163
164 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
165 PersistBatchEvent batchEvent = new PersistBatchEvent();
166 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
167 persistenceHandler.onEvent(batchEvent);
168
169 verify(persistenceHandler, times(1)).flush(eq(0));
170 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
171 verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
172 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
173 assertTrue(batch.isEmpty());
174
175 }
176
177 @Test(timeOut = 10_000)
178 public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws Exception {
179
180
181 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
182 batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
183 PersistBatchEvent batchEvent = new PersistBatchEvent();
184 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
185 persistenceHandler.onEvent(batchEvent);
186
187 verify(persistenceHandler, times(1)).flush(eq(0));
188 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
189 verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
190 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
191 assertEquals(batch.getNumEvents(), 1);
192 assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
193
194 }
195
196 @Test(timeOut = 10_000)
197 public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exception {
198
199
200 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
201 batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
202 PersistBatchEvent batchEvent = new PersistBatchEvent();
203 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
204 persistenceHandler.onEvent(batchEvent);
205
206 verify(persistenceHandler, times(1)).flush(eq(1));
207 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
208 verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
209 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
210 assertEquals(batch.getNumEvents(), 1);
211 assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
212 assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
213
214 }
215
216 @Test(timeOut = 10_000)
217 public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
218
219
220 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
221 batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
222 PersistBatchEvent batchEvent = new PersistBatchEvent();
223 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
224 persistenceHandler.onEvent(batchEvent);
225
226 verify(persistenceHandler, times(1)).flush(eq(0));
227 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
228 verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
229 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
230 assertEquals(batch.getNumEvents(), 1);
231 assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
232
233 }
234
235 @Test(timeOut = 10_000)
236 public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
237
238
239 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
240 batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
241 PersistBatchEvent batchEvent = new PersistBatchEvent();
242 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
243
244
245 persistenceHandler.onEvent(batchEvent);
246
247 verify(persistenceHandler, times(1)).flush(eq(0));
248 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
249 verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
250 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
251 assertEquals(batch.getNumEvents(), 0);
252
253 }
254
255 @Test(timeOut = 10_000)
256 public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
257
258
259 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
260 batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
261 batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
262 PersistBatchEvent batchEvent = new PersistBatchEvent();
263 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
264
265
266 assertEquals(batch.getNumEvents(), 2);
267
268
269 persistenceHandler.onEvent(batchEvent);
270
271 verify(persistenceHandler, times(1)).flush(eq(1));
272 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
273 verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
274 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
275 assertEquals(batch.getNumEvents(), 1);
276 assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
277 assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
278
279 }
280
281 @Test(timeOut = 10_000)
282 public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
283
284
285
286
287
288 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
289 batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
290 batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
291 PersistBatchEvent batchEvent = new PersistBatchEvent();
292 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
293
294
295 assertEquals(batch.getNumEvents(), 2);
296
297
298 persistenceHandler.onEvent(batchEvent);
299
300 verify(persistenceHandler, times(1)).flush(eq(1));
301 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
302 verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
303 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
304 assertEquals(batch.getNumEvents(), 1);
305 assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
306 assertEquals(batch.get(0).getCommitTimestamp(), SECOND_CT);
307
308 }
309
310 @Test(timeOut = 10_000)
311 public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
312
313
314 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
315 batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
316 batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
317 PersistBatchEvent batchEvent = new PersistBatchEvent();
318 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
319
320
321 assertEquals(batch.getNumEvents(), 2);
322
323
324 persistenceHandler.onEvent(batchEvent);
325
326 verify(persistenceHandler, times(1)).flush(eq(0));
327 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
328 verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
329 verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
330 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
331 assertEquals(batch.getNumEvents(), 0);
332
333 }
334
335 @Test(timeOut = 10_000)
336 public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
337
338
339 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
340 batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
341 batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class));
342 PersistBatchEvent batchEvent = new PersistBatchEvent();
343 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
344
345
346 assertEquals(batch.getNumEvents(), 2);
347
348
349 persistenceHandler.onEvent(batchEvent);
350
351 verify(persistenceHandler, times(1)).flush(eq(0));
352 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
353 verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
354 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
355 assertEquals(batch.getNumEvents(), 2);
356 assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
357 assertEquals(batch.get(1).getStartTimestamp(), SECOND_ST);
358
359 }
360
361
362 @Test(timeOut = 10_000)
363 public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents() throws Exception {
364
365
366 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
367
368 batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
369 batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
370 batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
371 batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
372 batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
373 batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
374 PersistBatchEvent batchEvent = new PersistBatchEvent();
375 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
376
377
378 assertEquals(batch.getNumEvents(), 6);
379
380
381 persistenceHandler.onEvent(batchEvent);
382
383 verify(persistenceHandler, times(1)).flush(2);
384 verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
385 verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
386 verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
387 assertEquals(batch.getNumEvents(), 4);
388 assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
389 assertEquals(batch.get(1).getStartTimestamp(), FIFTH_ST);
390 assertEquals(batch.get(1).getCommitTimestamp(), FIFTH_CT);
391 assertEquals(batch.get(2).getStartTimestamp(), THIRD_ST);
392 assertEquals(batch.get(2).getCommitTimestamp(), THIRD_CT);
393 assertEquals(batch.get(3).getStartTimestamp(), FOURTH_ST);
394
395 }
396
397 @Test(timeOut = 10_000)
398 public void testPanicPersistingEvents() throws Exception {
399
400
401 Panicker panicker = spy(new RuntimeExceptionPanicker());
402 persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
403 "localhost:1234",
404 leaseManager,
405 commitTable,
406 replyProcessor,
407 retryProcessor,
408 panicker));
409
410
411 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
412 batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
413 PersistBatchEvent batchEvent = new PersistBatchEvent();
414 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
415
416 doThrow(IOException.class).when(mockWriter).flush();
417
418 try {
419 persistenceHandler.onEvent(batchEvent);
420 fail();
421 } catch (RuntimeException re) {
422
423 }
424
425 verify(persistenceHandler, times(1)).flush(1);
426 verify(panicker, times(1)).panic(eq("Error persisting commit batch"), any(IOException.class));
427 verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
428 verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
429
430 }
431
432 @Test(timeOut = 10_000)
433 public void testPanicBecauseMasterLosesMastership() throws Exception {
434
435
436
437
438
439
440 doReturn(false).when(leaseManager).stillInLeasePeriod();
441
442
443 Panicker panicker = spy(new RuntimeExceptionPanicker());
444 persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
445 "localhost:1234",
446 leaseManager,
447 commitTable,
448 replyProcessor,
449 retryProcessor,
450 panicker));
451
452
453 Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
454 batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
455 PersistBatchEvent batchEvent = new PersistBatchEvent();
456 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
457
458 try {
459 persistenceHandler.onEvent(batchEvent);
460 fail();
461 } catch (RuntimeException re) {
462
463 }
464 verify(persistenceHandler, times(1)).flush(eq(1));
465 verify(mockWriter, never()).flush();
466 verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
467 verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
468 verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
469
470
471
472
473
474
475 doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
476
477
478 panicker = spy(new RuntimeExceptionPanicker());
479 persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
480 "localhost:1234",
481 leaseManager,
482 commitTable,
483 replyProcessor,
484 retryProcessor,
485 panicker));
486
487
488 batch = new Batch(BATCH_ID, BATCH_SIZE);
489 batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
490 batchEvent = new PersistBatchEvent();
491 PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
492
493 try {
494 persistenceHandler.onEvent(batchEvent);
495 fail();
496 } catch (RuntimeException re) {
497
498 }
499 verify(persistenceHandler, times(1)).flush(eq(1));
500 verify(mockWriter, times(1)).flush();
501 verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
502 verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
503 verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
504
505 }
506
507 }