View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.commons.pool2.ObjectPool;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.metrics.NullMetricsProvider;
24  import org.jboss.netty.channel.Channel;
25  import org.mockito.ArgumentCaptor;
26  import org.mockito.Mock;
27  import org.mockito.Mockito;
28  import org.mockito.MockitoAnnotations;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.testng.annotations.AfterMethod;
32  import org.testng.annotations.BeforeMethod;
33  import org.testng.annotations.Test;
34  
35  import com.lmax.disruptor.BlockingWaitStrategy;
36  
37  import java.io.IOException;
38  
39  import static org.mockito.Matchers.any;
40  import static org.mockito.Matchers.anyLong;
41  import static org.mockito.Matchers.anyString;
42  import static org.mockito.Mockito.doReturn;
43  import static org.mockito.Mockito.doThrow;
44  import static org.mockito.Mockito.mock;
45  import static org.mockito.Mockito.spy;
46  import static org.mockito.Mockito.timeout;
47  import static org.mockito.Mockito.times;
48  import static org.mockito.Mockito.verify;
49  import static org.testng.Assert.assertEquals;
50  
51  // TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
52  public class TestPersistenceProcessor {
53  
54      private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
55  
56      private static final long ANY_LWM = 1234L;
57      private static final int ANY_ST = 0;
58      private static final int ANY_CT = 1;
59  
60      @Mock
61      private CommitTable.Writer mockWriter;
62      @Mock
63      private CommitTable.Client mockClient;
64      @Mock
65      private RetryProcessor retryProcessor;
66      @Mock
67      private Panicker panicker;
68  
69      private MetricsRegistry metrics;
70      private CommitTable commitTable;
71  
72      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
73      public void initMocksAndComponents() throws Exception {
74  
75          MockitoAnnotations.initMocks(this);
76  
77          // Configure null metrics provider
78          metrics = new NullMetricsProvider();
79  
80          // Configure commit table to return the mocked writer and client
81          commitTable = new CommitTable() {
82              @Override
83              public Writer getWriter() {
84                  return mockWriter;
85              }
86  
87              @Override
88              public Client getClient() {
89                  return mockClient;
90              }
91          };
92  
93      }
94  
95      @AfterMethod
96      void afterMethod() {
97          Mockito.reset(mockWriter);
98      }
99  
100     @Test(timeOut = 30_000)
101     public void testLowWatermarkIsPersisted() throws Exception {
102 
103         TSOServerConfig tsoConfig = new TSOServerConfig();
104 
105         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
106         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
107             handlers[i] = new PersistenceProcessorHandler(metrics,
108                                                           "localhost:1234",
109                                                           mock(LeaseManager.class),
110                                                           commitTable,
111                                                           mock(ReplyProcessor.class),
112                                                           retryProcessor,
113                                                           panicker);
114         }
115 
116         // Component under test
117         PersistenceProcessorImpl persistenceProcessor =
118                 new PersistenceProcessorImpl(tsoConfig,
119                                              new BlockingWaitStrategy(),
120                                              commitTable,
121                                              mock(ObjectPool.class),
122                                              panicker,
123                                              handlers,
124                                              metrics);
125 
126         persistenceProcessor.persistLowWatermark(ANY_LWM).get();
127 
128         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
129         CommitTable.Writer lwmWriter = commitTable.getWriter();
130         verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
131         assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
132 
133     }
134 
135     @Test(timeOut = 30_000)
136     public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
137 
138         final int NUM_CT_WRITERS = 1;
139         final int BATCH_SIZE_PER_CT_WRITER = 2;
140 
141         // Init a non-HA lease manager
142         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
143                                                                  mock(TSOStateManager.class)));
144 
145         TSOServerConfig tsoConfig = new TSOServerConfig();
146         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
147         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
148 
149         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
150 
151         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
152 
153         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
154         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
155             handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
156                                                           leaseManager,
157                                                           commitTable,
158                                                           replyProcessor,
159                                                           retryProcessor,
160                                                           panicker);
161         }
162 
163         // Component under test
164         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
165                                                                      panicker, handlers, metrics);
166 
167         verify(batchPool, times(1)).borrowObject(); // Called during initialization
168 
169         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
170         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
171         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
172         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
173 
174         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
175 
176     }
177 
178     @Test(timeOut = 30_000)
179     public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
180 
181         final int NUM_CT_WRITERS = 2;
182         final int BATCH_SIZE_PER_CT_WRITER = 2;
183 
184         // Init a non-HA lease manager
185         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
186                                                                  mock(TSOStateManager.class)));
187 
188         TSOServerConfig tsoConfig = new TSOServerConfig();
189         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
190         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
191 
192         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
193 
194         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
195 
196         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
197         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
198             handlers[i] = new PersistenceProcessorHandler(metrics,
199                                                           "localhost:1234",
200                                                           leaseManager,
201                                                           commitTable,
202                                                           replyProcessor,
203                                                           retryProcessor,
204                                                           panicker);
205         }
206 
207         // Component under test
208         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
209                                                                      panicker, handlers, metrics);
210 
211         verify(batchPool, times(1)).borrowObject(); // Called during initialization
212 
213         // Fill 1st handler Batches completely
214         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
215         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
216         verify(batchPool, times(2)).borrowObject();
217         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
218         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
219         verify(batchPool, times(3)).borrowObject();
220 
221         // Test empty flush does not trigger response in getting a new currentBatch
222         proc.triggerCurrentBatchFlush();
223         verify(batchPool, times(3)).borrowObject();
224 
225         // Fill 2nd handler Batches completely
226         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
227         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
228         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
229         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
230         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
231 
232         // Start filling a new currentBatch and flush it immediately
233         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
234         verify(batchPool, times(5)).borrowObject();
235         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
236         verify(batchPool, times(6)).borrowObject();
237 
238         // Test empty flush does not trigger response
239         proc.triggerCurrentBatchFlush();
240         proc.triggerCurrentBatchFlush();
241         proc.triggerCurrentBatchFlush();
242         proc.triggerCurrentBatchFlush();
243         proc.triggerCurrentBatchFlush();
244         verify(batchPool, times(6)).borrowObject();
245 
246     }
247 
248     @Test(timeOut = 30_000)
249     public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
250 
251         final int NUM_CT_WRITERS = 1;
252         final int BATCH_SIZE_PER_CT_WRITER = 1;
253 
254         TSOServerConfig tsoConfig = new TSOServerConfig();
255         tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
256         tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
257         tsoConfig.setBatchPersistTimeoutInMs(100);
258 
259         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
260 
261         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
262 
263         // Init a non-HA lease manager
264         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
265                 mock(TSOStateManager.class)));
266 
267         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
268         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
269             handlers[i] = new PersistenceProcessorHandler(metrics,
270                                                           "localhost:1234",
271                                                           leaseManager,
272                                                           commitTable,
273                                                           replyProcessor,
274                                                           retryProcessor,
275                                                           panicker);
276         }
277 
278         // Component under test
279         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
280                                                                      panicker, handlers, metrics);
281 
282         // The non-ha lease manager always return true for
283         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
284         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
285         proc.triggerCurrentBatchFlush();
286         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
287         verify(batchPool, times(2)).borrowObject();
288 
289     }
290 
291     @Test(timeOut = 30_000)
292     public void testCommitPersistenceWithHALeaseManagerAndMinimumCommitTableWriters() throws Exception {
293 
294         final int NUM_PERSIST_HANDLERS = 2; // Minimum commit table writers is 2
295 
296         TSOServerConfig tsoConfig = new TSOServerConfig();
297         tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
298 
299         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
300         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
301         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
302         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
303 
304     }
305 
306     @Test(timeOut = 30_000)
307     public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
308 
309         final int NUM_CT_WRITERS = 4;
310         final int BATCH_SIZE_PER_CT_WRITER = 4;
311 
312         TSOServerConfig tsoConfig = new TSOServerConfig();
313         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
314         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
315         tsoConfig.setBatchPersistTimeoutInMs(100);
316 
317         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
318         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
319         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
320         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
321 
322     }
323 
324     private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
325 
326         // Init a HA lease manager
327         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
328 
329         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
330 
331         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
332 
333         // Component under test
334         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
335                                                                      panicker, handlers, metrics);
336 
337         // Test: Configure the lease manager to return true always
338         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
339         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
340         proc.triggerCurrentBatchFlush();
341         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
342         verify(batchPool, times(2)).borrowObject();
343     }
344 
345     private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
346 
347         // Init a HA lease manager
348         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
349 
350         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
351 
352         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
353 
354         // Component under test
355         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
356                                                                      panicker, handlers, metrics);
357 
358         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
359         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
360         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
361         proc.triggerCurrentBatchFlush();
362         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
363         verify(batchPool, times(2)).borrowObject();
364     }
365 
366     private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
367 
368         // Init a HA lease manager
369         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
370 
371         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
372 
373         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
374 
375         // Component under test
376         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
377                                                                      panicker, handlers, metrics);
378 
379         // Test: Configure the lease manager to return false for stillInLeasePeriod
380         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
381         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
382         proc.triggerCurrentBatchFlush();
383         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
384         verify(batchPool, times(2)).borrowObject();
385     }
386 
387     private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
388 
389         // Init a HA lease manager
390         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
391 
392         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
393 
394         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
395 
396         // Component under test
397         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
398                                                                      panicker, handlers, metrics);
399 
400         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
401         // an exception when flush
402         // Configure mock writer to flush unsuccessfully
403         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
404         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
405         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
406         proc.triggerCurrentBatchFlush();
407         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
408         verify(batchPool, times(2)).borrowObject();
409 
410     }
411 
412     private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig,
413                                                             LeaseManager leaseManager,
414                                                             ObjectPool<Batch> batchPool)
415             throws Exception {
416         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
417         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
418             handlers[i] = new PersistenceProcessorHandler(metrics,
419                                                           "localhost:1234",
420                                                           leaseManager,
421                                                           commitTable,
422                                                           new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool),
423                                                           retryProcessor,
424                                                           new RuntimeExceptionPanicker());
425         }
426         return handlers;
427     }
428 
429     @Test(timeOut = 30_000)
430     public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
431 
432         // Init lease management (doesn't matter if HA or not)
433         LeaseManagement leaseManager = mock(LeaseManagement.class);
434 
435         TSOServerConfig config = new TSOServerConfig();
436 
437         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
438 
439         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
440 
441         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
442         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
443             handlers[i] = new PersistenceProcessorHandler(metrics,
444                                                           "localhost:1234",
445                                                           leaseManager,
446                                                           commitTable,
447                                                           replyProcessor,
448                                                           mock(RetryProcessor.class),
449                                                           panicker);
450         }
451 
452         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
453                                                                      panicker, handlers, metrics);
454 
455         MonitoringContext monCtx = new MonitoringContext(metrics);
456 
457         // Configure lease manager to work normally
458         doReturn(true).when(leaseManager).stillInLeasePeriod();
459 
460         // Configure commit table writer to explode when flushing changes to DB
461         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
462 
463         // Check the panic is extended!
464         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
465         proc.triggerCurrentBatchFlush();
466         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
467 
468     }
469 
470     @Test(timeOut = 30_000)
471     public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
472 
473         TSOServerConfig config = new TSOServerConfig();
474 
475         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
476 
477         ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
478 
479         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
480         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
481             handlers[i] = new PersistenceProcessorHandler(metrics,
482                                                           "localhost:1234",
483                                                           mock(LeaseManager.class),
484                                                           commitTable,
485                                                           replyProcessor,
486                                                           retryProcessor,
487                                                           panicker);
488         }
489 
490         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
491                                                                      panicker, handlers, metrics);
492 
493         // Configure writer to explode with a runtime exception
494         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
495         MonitoringContext monCtx = new MonitoringContext(metrics);
496 
497         // Check the panic is extended!
498         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
499         proc.triggerCurrentBatchFlush();
500         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
501 
502     }
503 
504 }