View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.commons.pool2.ObjectPool;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.timestamp.storage.TimestampStorage;
24  import org.mockito.Mock;
25  import org.mockito.Mockito;
26  import org.mockito.MockitoAnnotations;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  import org.testng.annotations.AfterMethod;
30  import org.testng.annotations.BeforeMethod;
31  import org.testng.annotations.Test;
32  
33  import com.lmax.disruptor.BlockingWaitStrategy;
34  
35  import java.io.IOException;
36  
37  import static org.mockito.Matchers.any;
38  import static org.mockito.Matchers.anyLong;
39  import static org.mockito.Matchers.anyString;
40  import static org.mockito.Mockito.doReturn;
41  import static org.mockito.Mockito.doThrow;
42  import static org.mockito.Mockito.mock;
43  import static org.mockito.Mockito.spy;
44  import static org.mockito.Mockito.timeout;
45  import static org.mockito.Mockito.verify;
46  
47  public class TestPanicker {
48  
49      private static final Logger LOG = LoggerFactory.getLogger(TestPanicker.class);
50  
51      @Mock
52      private CommitTable.Writer mockWriter;
53      @Mock
54      private MetricsRegistry metrics;
55  
56      @BeforeMethod
57      public void initMocksAndComponents() {
58          MockitoAnnotations.initMocks(this);
59      }
60  
61      @AfterMethod
62      void afterMethod() {
63          Mockito.reset(mockWriter);
64      }
65  
66      // Note this test has been moved and refactored to TestTimestampOracle because
67      // it tests the behaviour of the TimestampOracle.
68      // Please, remove me in a future commit
69      @Test(timeOut = 10_000)
70      public void testTimestampOraclePanic() throws Exception {
71  
72          TimestampStorage storage = spy(new TimestampOracleImpl.InMemoryTimestampStorage());
73          Panicker panicker = spy(new MockPanicker());
74  
75          doThrow(new RuntimeException("Out of memory")).when(storage).updateMaxTimestamp(anyLong(), anyLong());
76  
77          final TimestampOracleImpl tso = new TimestampOracleImpl(metrics, storage, panicker);
78          tso.initialize();
79          Thread allocThread = new Thread("AllocThread") {
80              @Override
81              public void run() {
82                  while (true) {
83                      tso.next();
84                  }
85              }
86          };
87          allocThread.start();
88  
89          verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
90  
91      }
92  
93      // Note this test has been moved and refactored to TestPersistenceProcessor because
94      // it tests the behaviour of the PersistenceProcessor.
95      // Please, remove me in a future commit
96      @Test(timeOut = 10_000)
97      public void testCommitTablePanic() throws Exception {
98  
99          Panicker panicker = spy(new MockPanicker());
100 
101         doThrow(new IOException("Unable to write@TestPanicker")).when(mockWriter).flush();
102 
103         final CommitTable.Client mockClient = mock(CommitTable.Client.class);
104         CommitTable commitTable = new CommitTable() {
105             @Override
106             public Writer getWriter() {
107                 return mockWriter;
108             }
109 
110             @Override
111             public Client getClient() {
112                 return mockClient;
113             }
114         };
115 
116         LeaseManager leaseManager = mock(LeaseManager.class);
117         doReturn(true).when(leaseManager).stillInLeasePeriod();
118         TSOServerConfig config = new TSOServerConfig();
119         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
120 
121         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
122         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
123             handlers[i] = new PersistenceProcessorHandler(metrics,
124                                                           "localhost:1234",
125                                                           leaseManager,
126                                                           commitTable,
127                                                           mock(ReplyProcessor.class),
128                                                           mock(RetryProcessor.class),
129                                                           panicker);
130         }
131 
132         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
133                                                                  new BlockingWaitStrategy(),
134                                                                  commitTable,
135                                                                  batchPool,
136                                                                  panicker,
137                                                                  handlers,
138                                                                  metrics);
139 
140         proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
141 
142         new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
143 
144         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
145 
146     }
147 
148     // Note this test has been moved and refactored to TestPersistenceProcessor because
149     // it tests the behaviour of the PersistenceProcessor.
150     // Please, remove me in a future commit
151     @Test(timeOut = 10_000)
152     public void testRuntimeExceptionTakesDownDaemon() throws Exception {
153 
154         Panicker panicker = spy(new MockPanicker());
155 
156         final CommitTable.Writer mockWriter = mock(CommitTable.Writer.class);
157         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
158 
159         final CommitTable.Client mockClient = mock(CommitTable.Client.class);
160         CommitTable commitTable = new CommitTable() {
161             @Override
162             public Writer getWriter() {
163                 return mockWriter;
164             }
165 
166             @Override
167             public Client getClient() {
168                 return mockClient;
169             }
170         };
171         TSOServerConfig config = new TSOServerConfig();
172         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
173 
174         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
175         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
176             handlers[i] = new PersistenceProcessorHandler(metrics,
177                                                           "localhost:1234",
178                                                           mock(LeaseManager.class),
179                                                           commitTable,
180                                                           mock(ReplyProcessor.class),
181                                                           mock(RetryProcessor.class),
182                                                           panicker);
183         }
184 
185         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
186                                                                  new BlockingWaitStrategy(),
187                                                                  commitTable,
188                                                                  batchPool,
189                                                                  panicker,
190                                                                  handlers,
191                                                                  metrics);
192         proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
193 
194         new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
195 
196         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
197 
198     }
199 
200 }