View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.commons.pool2.ObjectPool;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.metrics.NullMetricsProvider;
24  import org.jboss.netty.channel.Channel;
25  import org.mockito.ArgumentCaptor;
26  import org.mockito.Mock;
27  import org.mockito.Mockito;
28  import org.mockito.MockitoAnnotations;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.testng.annotations.AfterMethod;
32  import org.testng.annotations.BeforeMethod;
33  import org.testng.annotations.Test;
34  
35  import com.lmax.disruptor.BlockingWaitStrategy;
36  
37  import java.io.IOException;
38  
39  import static org.mockito.Matchers.any;
40  import static org.mockito.Matchers.anyLong;
41  import static org.mockito.Matchers.anyString;
42  import static org.mockito.Mockito.doReturn;
43  import static org.mockito.Mockito.doThrow;
44  import static org.mockito.Mockito.mock;
45  import static org.mockito.Mockito.spy;
46  import static org.mockito.Mockito.timeout;
47  import static org.mockito.Mockito.times;
48  import static org.mockito.Mockito.verify;
49  import static org.testng.Assert.assertEquals;
50  
51  // TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
52  public class TestPersistenceProcessor {
53  
54      private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
55  
56      private static final long ANY_LWM = 1234L;
57      private static final int ANY_ST = 0;
58      private static final int ANY_CT = 1;
59  
60      @Mock
61      private CommitTable.Writer mockWriter;
62      @Mock
63      private CommitTable.Client mockClient;
64      @Mock
65      private RetryProcessor retryProcessor;
66      @Mock
67      private Panicker panicker;
68  
69      private MetricsRegistry metrics;
70      private CommitTable commitTable;
71      private LowWatermarkWriter lowWatermarkWriter;
72  
73      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
74      public void initMocksAndComponents() throws Exception {
75  
76          MockitoAnnotations.initMocks(this);
77  
78          // Configure null metrics provider
79          metrics = new NullMetricsProvider();
80  
81          // Configure commit table to return the mocked writer and client
82          commitTable = new CommitTable() {
83              @Override
84              public Writer getWriter() {
85                  return mockWriter;
86              }
87  
88              @Override
89              public Client getClient() {
90                  return mockClient;
91              }
92          };
93  
94      }
95  
96      @AfterMethod
97      void afterMethod() {
98          Mockito.reset(mockWriter);
99      }
100 
101     @Test(timeOut = 30_000)
102     public void testLowWatermarkIsPersisted() throws Exception {
103 
104         TSOServerConfig tsoConfig = new TSOServerConfig();
105         lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
106 
107         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
108         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
109             handlers[i] = new PersistenceProcessorHandler(metrics,
110                                                           "localhost:1234",
111                                                           mock(LeaseManager.class),
112                                                           commitTable,
113                                                           mock(ReplyProcessor.class),
114                                                           retryProcessor,
115                                                           panicker);
116         }
117 
118         // Component under test
119         PersistenceProcessorImpl persistenceProcessor =
120                 new PersistenceProcessorImpl(tsoConfig,
121                                              new BlockingWaitStrategy(),
122                                              commitTable,
123                                              mock(ObjectPool.class),
124                                              panicker,
125                                              handlers,
126                                              metrics);
127 
128         lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
129 
130         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
131         CommitTable.Writer lwmWriter = commitTable.getWriter();
132         verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
133         assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
134 
135     }
136 
137     @Test(timeOut = 30_000)
138     public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
139 
140         final int NUM_CT_WRITERS = 1;
141         final int BATCH_SIZE_PER_CT_WRITER = 2;
142 
143         // Init a non-HA lease manager
144         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
145                                                                  mock(TSOStateManager.class)));
146 
147         TSOServerConfig tsoConfig = new TSOServerConfig();
148         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
149         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
150 
151         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
152 
153         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
154 
155         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
156         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
157             handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
158                                                           leaseManager,
159                                                           commitTable,
160                                                           replyProcessor,
161                                                           retryProcessor,
162                                                           panicker);
163         }
164 
165         // Component under test
166         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
167                                                                      panicker, handlers, metrics);
168 
169         verify(batchPool, times(1)).borrowObject(); // Called during initialization
170 
171         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
172         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
173         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
174         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
175 
176         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
177 
178     }
179 
180     @Test(timeOut = 30_000)
181     public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
182 
183         final int NUM_CT_WRITERS = 2;
184         final int BATCH_SIZE_PER_CT_WRITER = 2;
185 
186         // Init a non-HA lease manager
187         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
188                                                                  mock(TSOStateManager.class)));
189 
190         TSOServerConfig tsoConfig = new TSOServerConfig();
191         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
192         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
193 
194         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
195 
196         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
197 
198         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
199         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
200             handlers[i] = new PersistenceProcessorHandler(metrics,
201                                                           "localhost:1234",
202                                                           leaseManager,
203                                                           commitTable,
204                                                           replyProcessor,
205                                                           retryProcessor,
206                                                           panicker);
207         }
208 
209         // Component under test
210         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
211                                                                      panicker, handlers, metrics);
212 
213         verify(batchPool, times(1)).borrowObject(); // Called during initialization
214 
215         // Fill 1st handler Batches completely
216         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
217         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
218         verify(batchPool, times(2)).borrowObject();
219         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
220         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
221         verify(batchPool, times(3)).borrowObject();
222 
223         // Test empty flush does not trigger response in getting a new currentBatch
224         proc.triggerCurrentBatchFlush();
225         verify(batchPool, times(3)).borrowObject();
226 
227         // Fill 2nd handler Batches completely
228         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
229         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
230         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
231         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
232         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
233 
234         // Start filling a new currentBatch and flush it immediately
235         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
236         verify(batchPool, times(5)).borrowObject();
237         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
238         verify(batchPool, times(6)).borrowObject();
239 
240         // Test empty flush does not trigger response
241         proc.triggerCurrentBatchFlush();
242         proc.triggerCurrentBatchFlush();
243         proc.triggerCurrentBatchFlush();
244         proc.triggerCurrentBatchFlush();
245         proc.triggerCurrentBatchFlush();
246         verify(batchPool, times(6)).borrowObject();
247 
248     }
249 
250     @Test(timeOut = 30_000)
251     public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
252 
253         final int NUM_CT_WRITERS = 1;
254         final int BATCH_SIZE_PER_CT_WRITER = 1;
255 
256         TSOServerConfig tsoConfig = new TSOServerConfig();
257         tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
258         tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
259         tsoConfig.setBatchPersistTimeoutInMs(100);
260 
261         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
262 
263         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
264 
265         // Init a non-HA lease manager
266         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
267                 mock(TSOStateManager.class)));
268 
269         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
270         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
271             handlers[i] = new PersistenceProcessorHandler(metrics,
272                                                           "localhost:1234",
273                                                           leaseManager,
274                                                           commitTable,
275                                                           replyProcessor,
276                                                           retryProcessor,
277                                                           panicker);
278         }
279 
280         // Component under test
281         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
282                                                                      panicker, handlers, metrics);
283 
284         // The non-ha lease manager always return true for
285         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
286         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
287         proc.triggerCurrentBatchFlush();
288         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
289         verify(batchPool, times(2)).borrowObject();
290 
291     }
292 
293     @Test(timeOut = 30_000)
294     public void testCommitPersistenceWithHALeaseManagerAndMinimumCommitTableWriters() throws Exception {
295 
296         final int NUM_PERSIST_HANDLERS = 2; // Minimum commit table writers is 2
297 
298         TSOServerConfig tsoConfig = new TSOServerConfig();
299         tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
300 
301         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
302         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
303         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
304         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
305 
306     }
307 
308     @Test(timeOut = 30_000)
309     public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
310 
311         final int NUM_CT_WRITERS = 4;
312         final int BATCH_SIZE_PER_CT_WRITER = 4;
313 
314         TSOServerConfig tsoConfig = new TSOServerConfig();
315         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
316         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
317         tsoConfig.setBatchPersistTimeoutInMs(100);
318 
319         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
320         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
321         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
322         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
323 
324     }
325 
326     private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
327 
328         // Init a HA lease manager
329         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
330 
331         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
332 
333         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
334 
335         // Component under test
336         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
337                                                                      panicker, handlers, metrics);
338 
339         // Test: Configure the lease manager to return true always
340         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
341         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
342         proc.triggerCurrentBatchFlush();
343         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
344         verify(batchPool, times(2)).borrowObject();
345     }
346 
347     private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
348 
349         // Init a HA lease manager
350         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
351 
352         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
353 
354         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
355 
356         // Component under test
357         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
358                                                                      panicker, handlers, metrics);
359 
360         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
361         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
362         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
363         proc.triggerCurrentBatchFlush();
364         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
365         verify(batchPool, times(2)).borrowObject();
366     }
367 
368     private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
369 
370         // Init a HA lease manager
371         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
372 
373         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
374 
375         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
376 
377         // Component under test
378         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
379                                                                      panicker, handlers, metrics);
380 
381         // Test: Configure the lease manager to return false for stillInLeasePeriod
382         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
383         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
384         proc.triggerCurrentBatchFlush();
385         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
386         verify(batchPool, times(2)).borrowObject();
387     }
388 
389     private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
390 
391         // Init a HA lease manager
392         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
393 
394         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
395 
396         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
397 
398         // Component under test
399         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
400                                                                      panicker, handlers, metrics);
401 
402         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
403         // an exception when flush
404         // Configure mock writer to flush unsuccessfully
405         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
406         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
407         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
408         proc.triggerCurrentBatchFlush();
409         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
410         verify(batchPool, times(2)).borrowObject();
411 
412     }
413 
414     private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig,
415                                                             LeaseManager leaseManager,
416                                                             ObjectPool<Batch> batchPool)
417             throws Exception {
418         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
419         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
420             handlers[i] = new PersistenceProcessorHandler(metrics,
421                                                           "localhost:1234",
422                                                           leaseManager,
423                                                           commitTable,
424                                                           new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool),
425                                                           retryProcessor,
426                                                           new RuntimeExceptionPanicker());
427         }
428         return handlers;
429     }
430 
431     @Test(timeOut = 30_000)
432     public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
433 
434         // Init lease management (doesn't matter if HA or not)
435         LeaseManagement leaseManager = mock(LeaseManagement.class);
436 
437         TSOServerConfig config = new TSOServerConfig();
438 
439         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
440 
441         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
442 
443         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
444         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
445             handlers[i] = new PersistenceProcessorHandler(metrics,
446                                                           "localhost:1234",
447                                                           leaseManager,
448                                                           commitTable,
449                                                           replyProcessor,
450                                                           mock(RetryProcessor.class),
451                                                           panicker);
452         }
453 
454         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
455                                                                      panicker, handlers, metrics);
456 
457         MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
458 
459         // Configure lease manager to work normally
460         doReturn(true).when(leaseManager).stillInLeasePeriod();
461 
462         // Configure commit table writer to explode when flushing changes to DB
463         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
464 
465         // Check the panic is extended!
466         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
467         proc.triggerCurrentBatchFlush();
468         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
469 
470     }
471 
472     @Test(timeOut = 30_000)
473     public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
474 
475         TSOServerConfig config = new TSOServerConfig();
476 
477         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
478 
479         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
480 
481         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
482         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
483             handlers[i] = new PersistenceProcessorHandler(metrics,
484                                                           "localhost:1234",
485                                                           mock(LeaseManager.class),
486                                                           commitTable,
487                                                           replyProcessor,
488                                                           retryProcessor,
489                                                           panicker);
490         }
491 
492         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
493                                                                      panicker, handlers, metrics);
494 
495         // Configure writer to explode with a runtime exception
496         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
497         MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
498 
499         // Check the panic is extended!
500         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
501         proc.triggerCurrentBatchFlush();
502         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
503 
504     }
505 
506 }