View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.commons.pool2.PooledObject;
22  import org.jboss.netty.channel.Channel;
23  import org.mockito.Mock;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  import org.testng.annotations.BeforeMethod;
27  import org.testng.annotations.Test;
28  
29  import static org.mockito.MockitoAnnotations.initMocks;
30  import static org.testng.Assert.assertEquals;
31  import static org.testng.Assert.assertFalse;
32  import static org.testng.Assert.assertTrue;
33  import static org.testng.Assert.fail;
34  
35  public class TestBatch {
36  
37      private static final Logger LOG = LoggerFactory.getLogger(TestBatch.class);
38  
39      private static final int BATCH_SIZE = 1000;
40  
41      private static final long ANY_ST = 1231;
42      private static final long ANY_CT = 2241;
43  
44      @Mock
45      private Channel channel;
46      @Mock
47      private MonitoringContextImpl monCtx;
48  
49      @BeforeMethod
50      void setup() {
51          initMocks(this);
52      }
53  
54      @Test(timeOut = 10_000)
55      public void testBatchFunctionality() {
56  
57          // Component to test
58          Batch batch = new Batch(0, BATCH_SIZE);
59  
60          // Test initial state is OK
61          assertTrue(batch.isEmpty(), "Batch should be empty");
62          assertFalse(batch.isFull(), "Batch shouldn't be full");
63          assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
64  
65          // Test getting or setting an element in the batch greater than the current number of events is illegal
66          try {
67              batch.get(1);
68              fail();
69          } catch (IllegalStateException ex) {
70              // Expected, as we can not access elements in the batch greater than the current number of events
71          }
72          try {
73              batch.set(1, new PersistEvent());
74              fail();
75          } catch (IllegalStateException ex) {
76              // Expected, as we can not access elements in the batch greater than the current number of events
77          }
78  
79          // Test when filling the batch with different types of events, that becomes full
80          for (int i = 0; i < BATCH_SIZE; i++) {
81              if (i % 4 == 0) {
82                  batch.addTimestamp(ANY_ST, channel, monCtx);
83              } else if (i % 4 == 1) {
84                  batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
85              } else if (i % 4 == 2) {
86                  batch.addCommitRetry(ANY_ST, channel, monCtx);
87              } else {
88                  batch.addAbort(ANY_ST, channel, monCtx);
89              }
90          }
91          assertFalse(batch.isEmpty(), "Batch should contain elements");
92          assertTrue(batch.isFull(), "Batch should be full");
93          assertEquals(batch.getNumEvents(), BATCH_SIZE, "Num events should be " + BATCH_SIZE);
94  
95          // Test an exception is thrown when batch is full and a new element is going to be added
96          try {
97              batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
98              fail("Should throw an IllegalStateException");
99          } catch (IllegalStateException e) {
100             assertEquals(e.getMessage(), "batch is full", "message returned doesn't match");
101             LOG.debug("IllegalStateException catch properly");
102         }
103         assertTrue(batch.isFull(), "Batch shouldn't be empty");
104 
105         // Check the first 3 events and the last one correspond to the filling done above
106         assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
107         assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
108         assertTrue(batch.get(2).getType().equals(PersistEvent.Type.COMMIT_RETRY));
109         assertTrue(batch.get(3).getType().equals(PersistEvent.Type.ABORT));
110 
111         // Set a new value for last element in Batch and check we obtain the right result
112         batch.decreaseNumEvents();
113         assertEquals(batch.getNumEvents(), BATCH_SIZE - 1, "Num events should be " + (BATCH_SIZE - 1));
114         try {
115             batch.get(BATCH_SIZE - 1);
116             fail();
117         } catch (IllegalStateException ex) {
118             // Expected, as we can not access elements in the batch greater than the current number of events
119         }
120 
121         // Re-check that batch is NOT full
122         assertFalse(batch.isFull(), "Batch shouldn't be full");
123 
124         // Clear the batch and goes back to its initial state
125         batch.clear();
126         assertTrue(batch.isEmpty(), "Batch should be empty");
127         assertFalse(batch.isFull(), "Batch shouldn't be full");
128         assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
129 
130     }
131 
132     @Test(timeOut = 10_000)
133     public void testBatchFactoryFunctionality() throws Exception {
134 
135         // Component to test
136         Batch.BatchFactory factory = new Batch.BatchFactory(BATCH_SIZE);
137 
138         // Check the factory creates a new batch properly...
139         Batch batch = factory.create();
140         assertTrue(batch.isEmpty(), "Batch should be empty");
141         assertFalse(batch.isFull(), "Batch shouldn't be full");
142         assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
143 
144         // ...and is wrapped in to a pooled object
145         PooledObject<Batch> pooledBatch = factory.wrap(batch);
146         assertEquals(pooledBatch.getObject(), batch);
147 
148         // Put some elements in the batch...
149         batch.addTimestamp(ANY_ST, channel, monCtx);
150         batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
151         batch.addCommitRetry(ANY_ST, channel, monCtx);
152         batch.addAbort(ANY_ST, channel, monCtx);
153         assertFalse(batch.isEmpty(), "Batch should contain elements");
154         assertFalse(batch.isFull(), "Batch should NOT be full");
155         assertEquals(batch.getNumEvents(), 4, "Num events should be 4");
156 
157         // ... and passivate the object through the factory. It should reset the state of the batch
158         factory.passivateObject(pooledBatch);
159         assertTrue(batch.isEmpty(), "Batch should NOT contain elements");
160         assertFalse(batch.isFull(), "Batch should NOT be full");
161         assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
162 
163     }
164 
165 }