View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.base.Optional;
21  import com.lmax.disruptor.YieldingWaitStrategy;
22  
23  import org.apache.commons.pool2.ObjectPool;
24  import org.apache.omid.committable.CommitTable;
25  import org.apache.omid.committable.CommitTable.CommitTimestamp;
26  import org.apache.omid.committable.InMemoryCommitTable;
27  import org.apache.omid.metrics.MetricsRegistry;
28  import org.apache.omid.metrics.NullMetricsProvider;
29  import org.jboss.netty.channel.Channel;
30  import org.mockito.ArgumentCaptor;
31  import org.mockito.Mock;
32  import org.mockito.MockitoAnnotations;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.testng.Assert;
36  import org.testng.annotations.BeforeMethod;
37  import org.testng.annotations.Test;
38  
39  import static org.mockito.Matchers.any;
40  import static org.mockito.Mockito.timeout;
41  import static org.mockito.Mockito.verify;
42  import static org.testng.Assert.assertEquals;
43  
44  public class TestRetryProcessor {
45  
46      private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
47  
48      private static long NON_EXISTING_ST_TX = 1000;
49      private static long ST_TX_1 = 0;
50      private static long CT_TX_1 = 1;
51  
52      @Mock
53      private Channel channel;
54      @Mock
55      private ReplyProcessor replyProc;
56      @Mock
57      private Panicker panicker;
58      @Mock
59      private MetricsRegistry metrics;
60  
61      private CommitTable commitTable;
62  
63      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
64      public void initMocksAndComponents() {
65          MockitoAnnotations.initMocks(this);
66          // Init components
67          commitTable = new InMemoryCommitTable();
68      }
69  
70      @Test(timeOut = 10_000)
71      public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
72          ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
73  
74          // The element to test
75          RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
76  
77          // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
78          retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
79          ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
80  
81          verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
82          long startTS = firstTSCapture.getValue();
83          assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
84      }
85  
86      @Test(timeOut = 10_000)
87      public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
88          ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
89  
90          // The element to test
91          RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
92  
93          // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
94          commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
95          retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
96          ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
97          ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
98  
99          verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
100                                                                     secondTSCapture.capture(),
101                                                                     any(Channel.class));
102 
103         long startTS = firstTSCapture.getValue();
104         long commitTS = secondTSCapture.getValue();
105         assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
106         assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
107 
108     }
109 
110     @Test(timeOut = 10_000)
111     public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {
112 
113         // Invalidate the transaction
114         commitTable.getClient().tryInvalidateTransaction(ST_TX_1);
115 
116         // Pre-start verification: Validate that the transaction is invalidated
117         // NOTE: This test should be in the a test class for InMemoryCommitTable
118         Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
119         Assert.assertTrue(invalidTxMarker.isPresent());
120         Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);
121 
122         ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
123 
124         // The element to test
125         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
126 
127         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
128         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
129         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
130         verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
131         long startTS = startTSCapture.getValue();
132         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
133 
134     }
135 
136 }
137