View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.commons.pool2.ObjectPool;
22  import org.apache.omid.committable.CommitTable;
23  import org.apache.omid.metrics.MetricsRegistry;
24  import org.apache.omid.timestamp.storage.TimestampStorage;
25  import org.mockito.Mock;
26  import org.mockito.Mockito;
27  import org.mockito.MockitoAnnotations;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.testng.annotations.AfterMethod;
31  import org.testng.annotations.BeforeMethod;
32  import org.testng.annotations.Test;
33  
34  import com.lmax.disruptor.BlockingWaitStrategy;
35  
36  import java.io.IOException;
37  
38  import static org.mockito.Matchers.any;
39  import static org.mockito.Matchers.anyLong;
40  import static org.mockito.Matchers.anyString;
41  import static org.mockito.Mockito.doReturn;
42  import static org.mockito.Mockito.doThrow;
43  import static org.mockito.Mockito.mock;
44  import static org.mockito.Mockito.spy;
45  import static org.mockito.Mockito.timeout;
46  import static org.mockito.Mockito.verify;
47  
48  public class TestPanicker {
49  
50      private static final Logger LOG = LoggerFactory.getLogger(TestPanicker.class);
51  
52      @Mock
53      private CommitTable.Writer mockWriter;
54      @Mock
55      private MetricsRegistry metrics;
56  
57      @BeforeMethod
58      public void initMocksAndComponents() {
59          MockitoAnnotations.initMocks(this);
60      }
61  
62      @AfterMethod
63      void afterMethod() {
64          Mockito.reset(mockWriter);
65      }
66  
67      // Note this test has been moved and refactored to TestTimestampOracle because
68      // it tests the behaviour of the TimestampOracle.
69      // Please, remove me in a future commit
70      @Test(timeOut = 10_000)
71      public void testTimestampOraclePanic() throws Exception {
72  
73          TimestampStorage storage = spy(new TimestampOracleImpl.InMemoryTimestampStorage());
74          Panicker panicker = spy(new MockPanicker());
75  
76          doThrow(new RuntimeException("Out of memory")).when(storage).updateMaxTimestamp(anyLong(), anyLong());
77  
78          final TimestampOracleImpl tso = new TimestampOracleImpl(metrics, storage, panicker);
79          tso.initialize();
80          Thread allocThread = new Thread("AllocThread") {
81              @Override
82              public void run() {
83                  while (true) {
84                      tso.next();
85                  }
86              }
87          };
88          allocThread.start();
89  
90          verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
91  
92      }
93  
94      // Note this test has been moved and refactored to TestPersistenceProcessor because
95      // it tests the behaviour of the PersistenceProcessor.
96      // Please, remove me in a future commit
97      @Test(timeOut = 10_000)
98      public void testCommitTablePanic() throws Exception {
99  
100         Panicker panicker = spy(new MockPanicker());
101 
102         doThrow(new IOException("Unable to write@TestPanicker")).when(mockWriter).flush();
103 
104         final CommitTable.Client mockClient = mock(CommitTable.Client.class);
105         CommitTable commitTable = new CommitTable() {
106             @Override
107             public Writer getWriter() {
108                 return mockWriter;
109             }
110 
111             @Override
112             public Client getClient() {
113                 return mockClient;
114             }
115         };
116 
117         LeaseManager leaseManager = mock(LeaseManager.class);
118         doReturn(true).when(leaseManager).stillInLeasePeriod();
119         TSOServerConfig config = new TSOServerConfig();
120         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
121 
122         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
123         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
124             handlers[i] = new PersistenceProcessorHandler(metrics,
125                                                           "localhost:1234",
126                                                           leaseManager,
127                                                           commitTable,
128                                                           mock(ReplyProcessor.class),
129                                                           mock(RetryProcessor.class),
130                                                           panicker);
131         }
132 
133         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
134                                                                  new BlockingWaitStrategy(),
135                                                                  commitTable,
136                                                                  batchPool,
137                                                                  panicker,
138                                                                  handlers,
139                                                                  metrics);
140 
141         proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
142 
143         LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
144 
145         new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker,
146                 mock(TSOServerConfig.class), lowWatermarkWriter, mock(ReplyProcessor.class));
147 
148         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
149 
150     }
151 
152     // Note this test has been moved and refactored to TestPersistenceProcessor because
153     // it tests the behaviour of the PersistenceProcessor.
154     // Please, remove me in a future commit
155     @Test(timeOut = 10_000)
156     public void testRuntimeExceptionTakesDownDaemon() throws Exception {
157 
158         Panicker panicker = spy(new MockPanicker());
159 
160         final CommitTable.Writer mockWriter = mock(CommitTable.Writer.class);
161         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
162 
163         final CommitTable.Client mockClient = mock(CommitTable.Client.class);
164         CommitTable commitTable = new CommitTable() {
165             @Override
166             public Writer getWriter() {
167                 return mockWriter;
168             }
169 
170             @Override
171             public Client getClient() {
172                 return mockClient;
173             }
174         };
175         TSOServerConfig config = new TSOServerConfig();
176         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
177 
178         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
179         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
180             handlers[i] = new PersistenceProcessorHandler(metrics,
181                                                           "localhost:1234",
182                                                           mock(LeaseManager.class),
183                                                           commitTable,
184                                                           mock(ReplyProcessor.class),
185                                                           mock(RetryProcessor.class),
186                                                           panicker);
187         }
188 
189         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
190                                                                  new BlockingWaitStrategy(),
191                                                                  commitTable,
192                                                                  batchPool,
193                                                                  panicker,
194                                                                  handlers,
195                                                                  metrics);
196         proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
197 
198         LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
199 
200         new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class),
201                 lowWatermarkWriter, mock(ReplyProcessor.class));
202 
203         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
204 
205     }
206 
207 }