View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.base.Optional;
21  import org.apache.commons.pool2.ObjectPool;
22  import org.apache.omid.committable.CommitTable;
23  import org.apache.omid.committable.CommitTable.CommitTimestamp;
24  import org.apache.omid.committable.InMemoryCommitTable;
25  import org.apache.omid.metrics.MetricsRegistry;
26  import org.apache.omid.metrics.NullMetricsProvider;
27  import org.jboss.netty.channel.Channel;
28  import org.mockito.ArgumentCaptor;
29  import org.mockito.Mock;
30  import org.mockito.MockitoAnnotations;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  import org.testng.Assert;
34  import org.testng.annotations.BeforeMethod;
35  import org.testng.annotations.Test;
36  
37  import static org.mockito.Matchers.any;
38  import static org.mockito.Mockito.timeout;
39  import static org.mockito.Mockito.verify;
40  import static org.testng.Assert.assertEquals;
41  
42  public class TestRetryProcessor {
43  
44      private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
45  
46      private static long NON_EXISTING_ST_TX = 1000;
47      private static long ST_TX_1 = 0;
48      private static long CT_TX_1 = 1;
49  
50      @Mock
51      private Channel channel;
52      @Mock
53      private ReplyProcessor replyProc;
54      @Mock
55      private Panicker panicker;
56      @Mock
57      private MetricsRegistry metrics;
58  
59      private CommitTable commitTable;
60  
61      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
62      public void initMocksAndComponents() {
63          MockitoAnnotations.initMocks(this);
64          // Init components
65          commitTable = new InMemoryCommitTable();
66      }
67  
68      @Test(timeOut = 10_000)
69      public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
70          ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
71  
72          // The element to test
73          RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
74  
75          // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
76          retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
77          ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
78  
79          verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
80          long startTS = firstTSCapture.getValue();
81          assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
82      }
83  
84      @Test(timeOut = 10_000)
85      public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
86          ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
87  
88          // The element to test
89          RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
90  
91          // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
92          commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
93          retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
94          ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
95          ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
96  
97          verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
98                                                                      secondTSCapture.capture(),
99                                                                      any(Channel.class));
100 
101         long startTS = firstTSCapture.getValue();
102         long commitTS = secondTSCapture.getValue();
103         assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
104         assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
105 
106     }
107 
108     @Test(timeOut = 10_000)
109     public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {
110 
111         // Invalidate the transaction
112         commitTable.getClient().tryInvalidateTransaction(ST_TX_1);
113 
114         // Pre-start verification: Validate that the transaction is invalidated
115         // NOTE: This test should be in the a test class for InMemoryCommitTable
116         Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
117         Assert.assertTrue(invalidTxMarker.isPresent());
118         Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);
119 
120         ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
121 
122         // The element to test
123         RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
124 
125         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
126         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
127         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
128         verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
129         long startTS = startTSCapture.getValue();
130         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
131 
132     }
133 
134 }
135