View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.commons.pool2.ObjectPool;
22  import org.apache.omid.committable.CommitTable;
23  import org.apache.omid.metrics.MetricsRegistry;
24  import org.apache.omid.metrics.NullMetricsProvider;
25  import org.jboss.netty.channel.Channel;
26  import org.mockito.ArgumentCaptor;
27  import org.mockito.Mock;
28  import org.mockito.Mockito;
29  import org.mockito.MockitoAnnotations;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  import org.testng.annotations.AfterMethod;
33  import org.testng.annotations.BeforeMethod;
34  import org.testng.annotations.Test;
35  
36  import com.lmax.disruptor.BlockingWaitStrategy;
37  
38  import java.io.IOException;
39  
40  import static org.mockito.Matchers.any;
41  import static org.mockito.Matchers.anyLong;
42  import static org.mockito.Matchers.anyString;
43  import static org.mockito.Mockito.doReturn;
44  import static org.mockito.Mockito.doThrow;
45  import static org.mockito.Mockito.mock;
46  import static org.mockito.Mockito.spy;
47  import static org.mockito.Mockito.timeout;
48  import static org.mockito.Mockito.times;
49  import static org.mockito.Mockito.verify;
50  import static org.testng.Assert.assertEquals;
51  
52  // TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
53  public class TestPersistenceProcessor {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
56  
57      private static final long ANY_LWM = 1234L;
58      private static final int ANY_ST = 0;
59      private static final int ANY_CT = 1;
60  
61      @Mock
62      private CommitTable.Writer mockWriter;
63      @Mock
64      private CommitTable.Client mockClient;
65      @Mock
66      private RetryProcessor retryProcessor;
67      @Mock
68      private Panicker panicker;
69  
70      private MetricsRegistry metrics;
71      private CommitTable commitTable;
72      private LowWatermarkWriter lowWatermarkWriter;
73  
74      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
75      public void initMocksAndComponents() throws Exception {
76  
77          MockitoAnnotations.initMocks(this);
78  
79          // Configure null metrics provider
80          metrics = new NullMetricsProvider();
81  
82          // Configure commit table to return the mocked writer and client
83          commitTable = new CommitTable() {
84              @Override
85              public Writer getWriter() {
86                  return mockWriter;
87              }
88  
89              @Override
90              public Client getClient() {
91                  return mockClient;
92              }
93          };
94  
95      }
96  
97      @AfterMethod
98      void afterMethod() {
99          Mockito.reset(mockWriter);
100     }
101 
102     @Test(timeOut = 30_000)
103     public void testLowWatermarkIsPersisted() throws Exception {
104 
105         TSOServerConfig tsoConfig = new TSOServerConfig();
106         lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
107 
108         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
109         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
110             handlers[i] = new PersistenceProcessorHandler(metrics,
111                                                           "localhost:1234",
112                                                           mock(LeaseManager.class),
113                                                           commitTable,
114                                                           mock(ReplyProcessor.class),
115                                                           retryProcessor,
116                                                           panicker);
117         }
118 
119         // Component under test
120         PersistenceProcessorImpl persistenceProcessor =
121                 new PersistenceProcessorImpl(tsoConfig,
122                                              new BlockingWaitStrategy(),
123                                              commitTable,
124                                              mock(ObjectPool.class),
125                                              panicker,
126                                              handlers,
127                                              metrics);
128 
129         lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
130 
131         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
132         CommitTable.Writer lwmWriter = commitTable.getWriter();
133         verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
134         assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
135 
136     }
137 
138     @Test(timeOut = 30_000)
139     public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
140 
141         final int NUM_CT_WRITERS = 1;
142         final int BATCH_SIZE_PER_CT_WRITER = 2;
143 
144         // Init a non-HA lease manager
145         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
146                                                                  mock(TSOStateManager.class)));
147 
148         TSOServerConfig tsoConfig = new TSOServerConfig();
149         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
150         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
151 
152         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
153 
154         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
155 
156         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
157         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
158             handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
159                                                           leaseManager,
160                                                           commitTable,
161                                                           replyProcessor,
162                                                           retryProcessor,
163                                                           panicker);
164         }
165 
166         // Component under test
167         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
168                                                                      panicker, handlers, metrics);
169 
170         verify(batchPool, times(1)).borrowObject(); // Called during initialization
171 
172         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
173         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Flush: batch full
174         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
175         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Flush: batch full
176 
177         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
178 
179     }
180 
181     @Test(timeOut = 30_000)
182     public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
183 
184         final int NUM_CT_WRITERS = 2;
185         final int BATCH_SIZE_PER_CT_WRITER = 2;
186 
187         // Init a non-HA lease manager
188         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
189                                                                  mock(TSOStateManager.class)));
190 
191         TSOServerConfig tsoConfig = new TSOServerConfig();
192         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
193         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
194 
195         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
196 
197         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
198 
199         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
200         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
201             handlers[i] = new PersistenceProcessorHandler(metrics,
202                                                           "localhost:1234",
203                                                           leaseManager,
204                                                           commitTable,
205                                                           replyProcessor,
206                                                           retryProcessor,
207                                                           panicker);
208         }
209 
210         // Component under test
211         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
212                                                                      panicker, handlers, metrics);
213 
214         verify(batchPool, times(1)).borrowObject(); // Called during initialization
215 
216         // Fill 1st handler Batches completely
217         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
218         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 1st batch full
219         verify(batchPool, times(2)).borrowObject();
220         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent());
221         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent()); // 2nd batch full
222         verify(batchPool, times(3)).borrowObject();
223 
224         // Test empty flush does not trigger response in getting a new currentBatch
225         proc.triggerCurrentBatchFlush();
226         verify(batchPool, times(3)).borrowObject();
227 
228         // Fill 2nd handler Batches completely
229         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
230         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 1st batch full
231         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
232         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 2nd batch full
233         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
234 
235         // Start filling a new currentBatch and flush it immediately
236         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Batch not full
237         verify(batchPool, times(5)).borrowObject();
238         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
239         verify(batchPool, times(6)).borrowObject();
240 
241         // Test empty flush does not trigger response
242         proc.triggerCurrentBatchFlush();
243         proc.triggerCurrentBatchFlush();
244         proc.triggerCurrentBatchFlush();
245         proc.triggerCurrentBatchFlush();
246         proc.triggerCurrentBatchFlush();
247         verify(batchPool, times(6)).borrowObject();
248 
249     }
250 
251     @Test(timeOut = 30_000)
252     public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
253 
254         final int NUM_CT_WRITERS = 1;
255         final int BATCH_SIZE_PER_CT_WRITER = 1;
256 
257         TSOServerConfig tsoConfig = new TSOServerConfig();
258         tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
259         tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
260         tsoConfig.setBatchPersistTimeoutInMs(100);
261 
262         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
263 
264         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
265 
266         // Init a non-HA lease manager
267         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
268                 mock(TSOStateManager.class)));
269 
270         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
271         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
272             handlers[i] = new PersistenceProcessorHandler(metrics,
273                                                           "localhost:1234",
274                                                           leaseManager,
275                                                           commitTable,
276                                                           replyProcessor,
277                                                           retryProcessor,
278                                                           panicker);
279         }
280 
281         // Component under test
282         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
283                                                                      panicker, handlers, metrics);
284 
285         // The non-ha lease manager always return true for
286         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
287         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
288         proc.triggerCurrentBatchFlush();
289         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
290         verify(batchPool, times(2)).borrowObject();
291 
292     }
293 
294     @Test(timeOut = 30_000)
295     public void testCommitPersistenceWithHALeaseManagerAndMinimumCommitTableWriters() throws Exception {
296 
297         final int NUM_PERSIST_HANDLERS = 2; // Minimum commit table writers is 2
298 
299         TSOServerConfig tsoConfig = new TSOServerConfig();
300         tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
301 
302         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
303         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
304         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
305         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
306 
307     }
308 
309     @Test(timeOut = 30_000)
310     public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
311 
312         final int NUM_CT_WRITERS = 4;
313         final int BATCH_SIZE_PER_CT_WRITER = 4;
314 
315         TSOServerConfig tsoConfig = new TSOServerConfig();
316         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
317         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
318         tsoConfig.setBatchPersistTimeoutInMs(100);
319 
320         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
321         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
322         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
323         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
324 
325     }
326 
327     private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
328 
329         // Init a HA lease manager
330         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
331 
332         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
333 
334         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
335 
336         // Component under test
337         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
338                                                                      panicker, handlers, metrics);
339 
340         // Test: Configure the lease manager to return true always
341         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
342         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
343         proc.triggerCurrentBatchFlush();
344         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
345         verify(batchPool, times(2)).borrowObject();
346     }
347 
348     private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
349 
350         // Init a HA lease manager
351         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
352 
353         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
354 
355         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
356 
357         // Component under test
358         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
359                                                                      panicker, handlers, metrics);
360 
361         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
362         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
363         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
364         proc.triggerCurrentBatchFlush();
365         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
366         verify(batchPool, times(2)).borrowObject();
367     }
368 
369     private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
370 
371         // Init a HA lease manager
372         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
373 
374         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
375 
376         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
377 
378         // Component under test
379         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
380                                                                      panicker, handlers, metrics);
381 
382         // Test: Configure the lease manager to return false for stillInLeasePeriod
383         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
384         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
385         proc.triggerCurrentBatchFlush();
386         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
387         verify(batchPool, times(2)).borrowObject();
388     }
389 
390     private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
391 
392         // Init a HA lease manager
393         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
394 
395         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
396 
397         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
398 
399         // Component under test
400         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
401                                                                      panicker, handlers, metrics);
402 
403         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
404         // an exception when flush
405         // Configure mock writer to flush unsuccessfully
406         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
407         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
408         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
409         proc.triggerCurrentBatchFlush();
410         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
411         verify(batchPool, times(2)).borrowObject();
412 
413     }
414 
415     private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig,
416                                                             LeaseManager leaseManager,
417                                                             ObjectPool<Batch> batchPool)
418             throws Exception {
419         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
420         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
421             handlers[i] = new PersistenceProcessorHandler(metrics,
422                                                           "localhost:1234",
423                                                           leaseManager,
424                                                           commitTable,
425                                                           new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter),
426                                                           retryProcessor,
427                                                           new RuntimeExceptionPanicker());
428         }
429         return handlers;
430     }
431 
432     @Test(timeOut = 30_000)
433     public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
434 
435         // Init lease management (doesn't matter if HA or not)
436         LeaseManagement leaseManager = mock(LeaseManagement.class);
437 
438         TSOServerConfig config = new TSOServerConfig();
439 
440         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
441 
442         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
443 
444         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
445         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
446             handlers[i] = new PersistenceProcessorHandler(metrics,
447                                                           "localhost:1234",
448                                                           leaseManager,
449                                                           commitTable,
450                                                           replyProcessor,
451                                                           mock(RetryProcessor.class),
452                                                           panicker);
453         }
454 
455         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
456                                                                      panicker, handlers, metrics);
457 
458         MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
459 
460         // Configure lease manager to work normally
461         doReturn(true).when(leaseManager).stillInLeasePeriod();
462 
463         // Configure commit table writer to explode when flushing changes to DB
464         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
465 
466         // Check the panic is extended!
467         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
468         proc.triggerCurrentBatchFlush();
469         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
470 
471     }
472 
473     @Test(timeOut = 30_000)
474     public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
475 
476         TSOServerConfig config = new TSOServerConfig();
477 
478         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
479 
480         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
481 
482         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
483         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
484             handlers[i] = new PersistenceProcessorHandler(metrics,
485                                                           "localhost:1234",
486                                                           mock(LeaseManager.class),
487                                                           commitTable,
488                                                           replyProcessor,
489                                                           retryProcessor,
490                                                           panicker);
491         }
492 
493         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
494                                                                      panicker, handlers, metrics);
495 
496         // Configure writer to explode with a runtime exception
497         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
498         MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
499 
500         // Check the panic is extended!
501         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
502         proc.triggerCurrentBatchFlush();
503         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
504 
505     }
506 
507 }