View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.commons.pool2.ObjectPool;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.metrics.NullMetricsProvider;
24  import org.jboss.netty.channel.Channel;
25  import org.mockito.ArgumentCaptor;
26  import org.mockito.Mock;
27  import org.mockito.Mockito;
28  import org.mockito.MockitoAnnotations;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.testng.annotations.AfterMethod;
32  import org.testng.annotations.BeforeMethod;
33  import org.testng.annotations.Test;
34  
35  import java.io.IOException;
36  
37  import static org.mockito.Matchers.any;
38  import static org.mockito.Matchers.anyLong;
39  import static org.mockito.Matchers.anyString;
40  import static org.mockito.Mockito.doReturn;
41  import static org.mockito.Mockito.doThrow;
42  import static org.mockito.Mockito.mock;
43  import static org.mockito.Mockito.spy;
44  import static org.mockito.Mockito.timeout;
45  import static org.mockito.Mockito.times;
46  import static org.mockito.Mockito.verify;
47  import static org.testng.Assert.assertEquals;
48  
49  // TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
50  public class TestPersistenceProcessor {
51  
52      private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
53  
54      private static final long ANY_LWM = 1234L;
55      private static final int ANY_ST = 0;
56      private static final int ANY_CT = 1;
57  
58      @Mock
59      private CommitTable.Writer mockWriter;
60      @Mock
61      private CommitTable.Client mockClient;
62      @Mock
63      private RetryProcessor retryProcessor;
64      @Mock
65      private Panicker panicker;
66  
67      private MetricsRegistry metrics;
68      private CommitTable commitTable;
69  
70      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
71      public void initMocksAndComponents() throws Exception {
72  
73          MockitoAnnotations.initMocks(this);
74  
75          // Configure null metrics provider
76          metrics = new NullMetricsProvider();
77  
78          // Configure commit table to return the mocked writer and client
79          commitTable = new CommitTable() {
80              @Override
81              public Writer getWriter() {
82                  return mockWriter;
83              }
84  
85              @Override
86              public Client getClient() {
87                  return mockClient;
88              }
89          };
90  
91      }
92  
93      @AfterMethod
94      void afterMethod() {
95          Mockito.reset(mockWriter);
96      }
97  
98      @Test(timeOut = 30_000)
99      public void testLowWatermarkIsPersisted() throws Exception {
100 
101         TSOServerConfig tsoConfig = new TSOServerConfig();
102 
103         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
104         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
105             handlers[i] = new PersistenceProcessorHandler(metrics,
106                                                           "localhost:1234",
107                                                           mock(LeaseManager.class),
108                                                           commitTable,
109                                                           mock(ReplyProcessor.class),
110                                                           retryProcessor,
111                                                           panicker);
112         }
113 
114         // Component under test
115         PersistenceProcessorImpl persistenceProcessor =
116                 new PersistenceProcessorImpl(tsoConfig,
117                                              commitTable,
118                                              mock(ObjectPool.class),
119                                              panicker,
120                                              handlers,
121                                              metrics);
122 
123         persistenceProcessor.persistLowWatermark(ANY_LWM).get();
124 
125         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
126         CommitTable.Writer lwmWriter = commitTable.getWriter();
127         verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
128         assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
129 
130     }
131 
132     @Test(timeOut = 30_000)
133     public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
134 
135         final int NUM_CT_WRITERS = 1;
136         final int BATCH_SIZE_PER_CT_WRITER = 2;
137 
138         // Init a non-HA lease manager
139         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
140                                                                  mock(TSOStateManager.class)));
141 
142         TSOServerConfig tsoConfig = new TSOServerConfig();
143         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
144         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
145 
146         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
147 
148         ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
149 
150         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
151         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
152             handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
153                                                           leaseManager,
154                                                           commitTable,
155                                                           replyProcessor,
156                                                           retryProcessor,
157                                                           panicker);
158         }
159 
160         // Component under test
161         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
162                                                                      panicker, handlers, metrics);
163 
164         verify(batchPool, times(1)).borrowObject(); // Called during initialization
165 
166         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
167         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
168         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
169         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
170 
171         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
172 
173     }
174 
175     @Test(timeOut = 30_000)
176     public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
177 
178         final int NUM_CT_WRITERS = 2;
179         final int BATCH_SIZE_PER_CT_WRITER = 2;
180 
181         // Init a non-HA lease manager
182         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
183                                                                  mock(TSOStateManager.class)));
184 
185         TSOServerConfig tsoConfig = new TSOServerConfig();
186         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
187         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
188 
189         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
190 
191         ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
192 
193         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
194         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
195             handlers[i] = new PersistenceProcessorHandler(metrics,
196                                                           "localhost:1234",
197                                                           leaseManager,
198                                                           commitTable,
199                                                           replyProcessor,
200                                                           retryProcessor,
201                                                           panicker);
202         }
203 
204         // Component under test
205         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
206                                                                      panicker, handlers, metrics);
207 
208         verify(batchPool, times(1)).borrowObject(); // Called during initialization
209 
210         // Fill 1st handler Batches completely
211         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
212         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
213         verify(batchPool, times(2)).borrowObject();
214         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
215         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
216         verify(batchPool, times(3)).borrowObject();
217 
218         // Test empty flush does not trigger response in getting a new currentBatch
219         proc.triggerCurrentBatchFlush();
220         verify(batchPool, times(3)).borrowObject();
221 
222         // Fill 2nd handler Batches completely
223         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
224         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
225         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
226         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
227         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
228 
229         // Start filling a new currentBatch and flush it immediately
230         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
231         verify(batchPool, times(5)).borrowObject();
232         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
233         verify(batchPool, times(6)).borrowObject();
234 
235         // Test empty flush does not trigger response
236         proc.triggerCurrentBatchFlush();
237         proc.triggerCurrentBatchFlush();
238         proc.triggerCurrentBatchFlush();
239         proc.triggerCurrentBatchFlush();
240         proc.triggerCurrentBatchFlush();
241         verify(batchPool, times(6)).borrowObject();
242 
243     }
244 
245     @Test(timeOut = 30_000)
246     public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
247 
248         final int NUM_CT_WRITERS = 1;
249         final int BATCH_SIZE_PER_CT_WRITER = 1;
250 
251         TSOServerConfig tsoConfig = new TSOServerConfig();
252         tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
253         tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
254         tsoConfig.setBatchPersistTimeoutInMs(100);
255 
256         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
257 
258         ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
259 
260         // Init a non-HA lease manager
261         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
262                 mock(TSOStateManager.class)));
263 
264         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
265         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
266             handlers[i] = new PersistenceProcessorHandler(metrics,
267                                                           "localhost:1234",
268                                                           leaseManager,
269                                                           commitTable,
270                                                           replyProcessor,
271                                                           retryProcessor,
272                                                           panicker);
273         }
274 
275         // Component under test
276         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
277                                                                      panicker, handlers, metrics);
278 
279         // The non-ha lease manager always return true for
280         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
281         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
282         proc.triggerCurrentBatchFlush();
283         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
284         verify(batchPool, times(2)).borrowObject();
285 
286     }
287 
288     @Test(timeOut = 30_000)
289     public void testCommitPersistenceWithHALeaseManagerAndMinimumCommitTableWriters() throws Exception {
290 
291         final int NUM_PERSIST_HANDLERS = 2; // Minimum commit table writers is 2
292 
293         TSOServerConfig tsoConfig = new TSOServerConfig();
294         tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
295 
296         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
297         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
298         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
299         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
300 
301     }
302 
303     @Test(timeOut = 30_000)
304     public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
305 
306         final int NUM_CT_WRITERS = 4;
307         final int BATCH_SIZE_PER_CT_WRITER = 4;
308 
309         TSOServerConfig tsoConfig = new TSOServerConfig();
310         tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
311         tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
312         tsoConfig.setBatchPersistTimeoutInMs(100);
313 
314         testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
315         testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
316         testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
317         testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
318 
319     }
320 
321     private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
322 
323         // Init a HA lease manager
324         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
325 
326         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
327 
328         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
329 
330         // Component under test
331         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
332                                                                      panicker, handlers, metrics);
333 
334         // Test: Configure the lease manager to return true always
335         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
336         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
337         proc.triggerCurrentBatchFlush();
338         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
339         verify(batchPool, times(2)).borrowObject();
340     }
341 
342     private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
343 
344         // Init a HA lease manager
345         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
346 
347         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
348 
349         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
350 
351         // Component under test
352         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
353                                                                      panicker, handlers, metrics);
354 
355         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
356         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
357         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
358         proc.triggerCurrentBatchFlush();
359         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
360         verify(batchPool, times(2)).borrowObject();
361     }
362 
363     private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
364 
365         // Init a HA lease manager
366         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
367 
368         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
369 
370         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
371 
372         // Component under test
373         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
374                                                                      panicker, handlers, metrics);
375 
376         // Test: Configure the lease manager to return false for stillInLeasePeriod
377         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
378         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
379         proc.triggerCurrentBatchFlush();
380         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
381         verify(batchPool, times(2)).borrowObject();
382     }
383 
384     private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
385 
386         // Init a HA lease manager
387         LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
388 
389         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
390 
391         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
392 
393         // Component under test
394         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
395                                                                      panicker, handlers, metrics);
396 
397         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
398         // an exception when flush
399         // Configure mock writer to flush unsuccessfully
400         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
401         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
402         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
403         proc.triggerCurrentBatchFlush();
404         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
405         verify(batchPool, times(2)).borrowObject();
406 
407     }
408 
409     private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig,
410                                                             LeaseManager leaseManager,
411                                                             ObjectPool<Batch> batchPool)
412             throws Exception {
413         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
414         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
415             handlers[i] = new PersistenceProcessorHandler(metrics,
416                                                           "localhost:1234",
417                                                           leaseManager,
418                                                           commitTable,
419                                                           new ReplyProcessorImpl(metrics, panicker, batchPool),
420                                                           retryProcessor,
421                                                           new RuntimeExceptionPanicker());
422         }
423         return handlers;
424     }
425 
426     @Test(timeOut = 30_000)
427     public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
428 
429         // Init lease management (doesn't matter if HA or not)
430         LeaseManagement leaseManager = mock(LeaseManagement.class);
431 
432         TSOServerConfig config = new TSOServerConfig();
433 
434         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
435 
436         ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
437 
438         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
439         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
440             handlers[i] = new PersistenceProcessorHandler(metrics,
441                                                           "localhost:1234",
442                                                           leaseManager,
443                                                           commitTable,
444                                                           replyProcessor,
445                                                           mock(RetryProcessor.class),
446                                                           panicker);
447         }
448 
449         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
450                                                                      panicker, handlers, metrics);
451 
452         MonitoringContext monCtx = new MonitoringContext(metrics);
453 
454         // Configure lease manager to work normally
455         doReturn(true).when(leaseManager).stillInLeasePeriod();
456 
457         // Configure commit table writer to explode when flushing changes to DB
458         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
459 
460         // Check the panic is extended!
461         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
462         proc.triggerCurrentBatchFlush();
463         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
464 
465     }
466 
467     @Test(timeOut = 30_000)
468     public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
469 
470         TSOServerConfig config = new TSOServerConfig();
471 
472         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
473 
474         ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
475 
476         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
477         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
478             handlers[i] = new PersistenceProcessorHandler(metrics,
479                                                           "localhost:1234",
480                                                           mock(LeaseManager.class),
481                                                           commitTable,
482                                                           replyProcessor,
483                                                           retryProcessor,
484                                                           panicker);
485         }
486 
487         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
488                                                                      panicker, handlers, metrics);
489 
490         // Configure writer to explode with a runtime exception
491         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
492         MonitoringContext monCtx = new MonitoringContext(metrics);
493 
494         // Check the panic is extended!
495         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
496         proc.triggerCurrentBatchFlush();
497         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
498 
499     }
500 
501 }