1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.commons.pool2.ObjectPool;
22 import org.apache.omid.committable.CommitTable;
23 import org.apache.omid.metrics.MetricsRegistry;
24 import org.apache.omid.metrics.NullMetricsProvider;
25 import org.jboss.netty.channel.Channel;
26 import org.mockito.ArgumentCaptor;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.mockito.MockitoAnnotations;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.testng.annotations.AfterMethod;
33 import org.testng.annotations.BeforeMethod;
34 import org.testng.annotations.Test;
35
36 import com.lmax.disruptor.BlockingWaitStrategy;
37
38 import java.io.IOException;
39
40 import static org.mockito.Matchers.any;
41 import static org.mockito.Matchers.anyLong;
42 import static org.mockito.Matchers.anyString;
43 import static org.mockito.Mockito.doReturn;
44 import static org.mockito.Mockito.doThrow;
45 import static org.mockito.Mockito.mock;
46 import static org.mockito.Mockito.spy;
47 import static org.mockito.Mockito.timeout;
48 import static org.mockito.Mockito.times;
49 import static org.mockito.Mockito.verify;
50 import static org.testng.Assert.assertEquals;
51
52
53 public class TestPersistenceProcessor {
54
55 private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
56
57 private static final long ANY_LWM = 1234L;
58 private static final int ANY_ST = 0;
59 private static final int ANY_CT = 1;
60
61 @Mock
62 private CommitTable.Writer mockWriter;
63 @Mock
64 private CommitTable.Client mockClient;
65 @Mock
66 private RetryProcessor retryProcessor;
67 @Mock
68 private Panicker panicker;
69
70 private MetricsRegistry metrics;
71 private CommitTable commitTable;
72 private LowWatermarkWriter lowWatermarkWriter;
73
74 @BeforeMethod(alwaysRun = true, timeOut = 30_000)
75 public void initMocksAndComponents() throws Exception {
76
77 MockitoAnnotations.initMocks(this);
78
79
80 metrics = new NullMetricsProvider();
81
82
83 commitTable = new CommitTable() {
84 @Override
85 public Writer getWriter() {
86 return mockWriter;
87 }
88
89 @Override
90 public Client getClient() {
91 return mockClient;
92 }
93 };
94
95 }
96
97 @AfterMethod
98 void afterMethod() {
99 Mockito.reset(mockWriter);
100 }
101
102 @Test(timeOut = 30_000)
103 public void testLowWatermarkIsPersisted() throws Exception {
104
105 TSOServerConfig tsoConfig = new TSOServerConfig();
106 lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
107
108 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
109 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
110 handlers[i] = new PersistenceProcessorHandler(metrics,
111 "localhost:1234",
112 mock(LeaseManager.class),
113 commitTable,
114 mock(ReplyProcessor.class),
115 retryProcessor,
116 panicker);
117 }
118
119
120 PersistenceProcessorImpl persistenceProcessor =
121 new PersistenceProcessorImpl(tsoConfig,
122 new BlockingWaitStrategy(),
123 commitTable,
124 mock(ObjectPool.class),
125 panicker,
126 handlers,
127 metrics);
128
129 lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
130
131 ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
132 CommitTable.Writer lwmWriter = commitTable.getWriter();
133 verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
134 assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
135
136 }
137
138 @Test(timeOut = 30_000)
139 public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
140
141 final int NUM_CT_WRITERS = 1;
142 final int BATCH_SIZE_PER_CT_WRITER = 2;
143
144
145 VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
146 mock(TSOStateManager.class)));
147
148 TSOServerConfig tsoConfig = new TSOServerConfig();
149 tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
150 tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
151
152 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
153
154 ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
155
156 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
157 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
158 handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
159 leaseManager,
160 commitTable,
161 replyProcessor,
162 retryProcessor,
163 panicker);
164 }
165
166
167 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
168 panicker, handlers, metrics);
169
170 verify(batchPool, times(1)).borrowObject();
171
172 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
173 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
174 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
175 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
176
177 verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject();
178
179 }
180
181 @Test(timeOut = 30_000)
182 public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
183
184 final int NUM_CT_WRITERS = 2;
185 final int BATCH_SIZE_PER_CT_WRITER = 2;
186
187
188 VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
189 mock(TSOStateManager.class)));
190
191 TSOServerConfig tsoConfig = new TSOServerConfig();
192 tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
193 tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
194
195 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
196
197 ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
198
199 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
200 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
201 handlers[i] = new PersistenceProcessorHandler(metrics,
202 "localhost:1234",
203 leaseManager,
204 commitTable,
205 replyProcessor,
206 retryProcessor,
207 panicker);
208 }
209
210
211 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
212 panicker, handlers, metrics);
213
214 verify(batchPool, times(1)).borrowObject();
215
216
217 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
218 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
219 verify(batchPool, times(2)).borrowObject();
220 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent());
221 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent());
222 verify(batchPool, times(3)).borrowObject();
223
224
225 proc.triggerCurrentBatchFlush();
226 verify(batchPool, times(3)).borrowObject();
227
228
229 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
230 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
231 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
232 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
233 verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
234
235
236 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
237 verify(batchPool, times(5)).borrowObject();
238 proc.triggerCurrentBatchFlush();
239 verify(batchPool, times(6)).borrowObject();
240
241
242 proc.triggerCurrentBatchFlush();
243 proc.triggerCurrentBatchFlush();
244 proc.triggerCurrentBatchFlush();
245 proc.triggerCurrentBatchFlush();
246 proc.triggerCurrentBatchFlush();
247 verify(batchPool, times(6)).borrowObject();
248
249 }
250
251 @Test(timeOut = 30_000)
252 public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
253
254 final int NUM_CT_WRITERS = 1;
255 final int BATCH_SIZE_PER_CT_WRITER = 1;
256
257 TSOServerConfig tsoConfig = new TSOServerConfig();
258 tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
259 tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
260 tsoConfig.setBatchPersistTimeoutInMs(100);
261
262 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
263
264 ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
265
266
267 VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
268 mock(TSOStateManager.class)));
269
270 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
271 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
272 handlers[i] = new PersistenceProcessorHandler(metrics,
273 "localhost:1234",
274 leaseManager,
275 commitTable,
276 replyProcessor,
277 retryProcessor,
278 panicker);
279 }
280
281
282 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
283 panicker, handlers, metrics);
284
285
286
287 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
288 proc.triggerCurrentBatchFlush();
289 verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
290 verify(batchPool, times(2)).borrowObject();
291
292 }
293
294 @Test(timeOut = 30_000)
295 public void testCommitPersistenceWithHALeaseManagerAndMinimumCommitTableWriters() throws Exception {
296
297 final int NUM_PERSIST_HANDLERS = 2;
298
299 TSOServerConfig tsoConfig = new TSOServerConfig();
300 tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
301
302 testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
303 testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
304 testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
305 testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
306
307 }
308
309 @Test(timeOut = 30_000)
310 public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
311
312 final int NUM_CT_WRITERS = 4;
313 final int BATCH_SIZE_PER_CT_WRITER = 4;
314
315 TSOServerConfig tsoConfig = new TSOServerConfig();
316 tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
317 tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
318 tsoConfig.setBatchPersistTimeoutInMs(100);
319
320 testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
321 testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
322 testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
323 testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
324
325 }
326
327 private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
328
329
330 LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
331
332 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
333
334 PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
335
336
337 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
338 panicker, handlers, metrics);
339
340
341 doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
342 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
343 proc.triggerCurrentBatchFlush();
344 verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
345 verify(batchPool, times(2)).borrowObject();
346 }
347
348 private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
349
350
351 LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
352
353 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
354
355 PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
356
357
358 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
359 panicker, handlers, metrics);
360
361
362 doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
363 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
364 proc.triggerCurrentBatchFlush();
365 verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
366 verify(batchPool, times(2)).borrowObject();
367 }
368
369 private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
370
371
372 LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
373
374 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
375
376 PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
377
378
379 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
380 panicker, handlers, metrics);
381
382
383 doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
384 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
385 proc.triggerCurrentBatchFlush();
386 verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
387 verify(batchPool, times(2)).borrowObject();
388 }
389
390 private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
391
392
393 LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
394
395 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
396
397 PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
398
399
400 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
401 panicker, handlers, metrics);
402
403
404
405
406 doThrow(new IOException("Unable to write")).when(mockWriter).flush();
407 doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
408 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
409 proc.triggerCurrentBatchFlush();
410 verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
411 verify(batchPool, times(2)).borrowObject();
412
413 }
414
415 private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig,
416 LeaseManager leaseManager,
417 ObjectPool<Batch> batchPool)
418 throws Exception {
419 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
420 for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
421 handlers[i] = new PersistenceProcessorHandler(metrics,
422 "localhost:1234",
423 leaseManager,
424 commitTable,
425 new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter),
426 retryProcessor,
427 new RuntimeExceptionPanicker());
428 }
429 return handlers;
430 }
431
432 @Test(timeOut = 30_000)
433 public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
434
435
436 LeaseManagement leaseManager = mock(LeaseManagement.class);
437
438 TSOServerConfig config = new TSOServerConfig();
439
440 ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
441
442 ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
443
444 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
445 for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
446 handlers[i] = new PersistenceProcessorHandler(metrics,
447 "localhost:1234",
448 leaseManager,
449 commitTable,
450 replyProcessor,
451 mock(RetryProcessor.class),
452 panicker);
453 }
454
455 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
456 panicker, handlers, metrics);
457
458 MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
459
460
461 doReturn(true).when(leaseManager).stillInLeasePeriod();
462
463
464 doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
465
466
467 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
468 proc.triggerCurrentBatchFlush();
469 verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
470
471 }
472
473 @Test(timeOut = 30_000)
474 public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
475
476 TSOServerConfig config = new TSOServerConfig();
477
478 ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
479
480 ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
481
482 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
483 for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
484 handlers[i] = new PersistenceProcessorHandler(metrics,
485 "localhost:1234",
486 mock(LeaseManager.class),
487 commitTable,
488 replyProcessor,
489 retryProcessor,
490 panicker);
491 }
492
493 PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
494 panicker, handlers, metrics);
495
496
497 doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
498 MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
499
500
501 proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
502 proc.triggerCurrentBatchFlush();
503 verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
504
505 }
506
507 }