1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.omid.tso.TSOStateManager.StateObserver;
21 import org.apache.omid.tso.TSOStateManager.TSOState;
22 import org.testng.annotations.BeforeMethod;
23 import org.testng.annotations.Test;
24
25 import java.io.IOException;
26
27 import static org.mockito.Matchers.eq;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.timeout;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33 import static org.testng.Assert.assertEquals;
34 import static org.testng.Assert.assertTrue;
35
36 public class TestTSOStateManager {
37
38 private static final long INITIAL_STATE_VALUE = 1L;
39 private static final long NEW_STATE_VALUE = 1000;
40
41
42 private TimestampOracle timestampOracle = mock(TimestampOracle.class);
43
44
45 private TSOStateManager stateManager = new TSOStateManagerImpl(timestampOracle);
46
47 @BeforeMethod
48 public void beforeMethod() {
49
50 when(timestampOracle.getLast()).thenReturn(INITIAL_STATE_VALUE);
51 }
52
53 @Test(timeOut = 10_000)
54 public void testTSOServerStateInitialization() throws Exception {
55
56
57 TSOState initialState = stateManager.initialize();
58 assertEquals(initialState.getLowWatermark(), INITIAL_STATE_VALUE);
59 assertEquals(initialState.getEpoch(), INITIAL_STATE_VALUE);
60 assertTrue(initialState.getLowWatermark() == initialState.getEpoch(),
61 "In this implementation low watermark should be equal to epoch");
62
63
64 when(timestampOracle.getLast()).thenReturn(NEW_STATE_VALUE);
65
66 TSOState secondState = stateManager.initialize();
67 assertEquals(secondState.getLowWatermark(), NEW_STATE_VALUE);
68 assertEquals(secondState.getEpoch(), NEW_STATE_VALUE);
69 assertTrue(secondState.getLowWatermark() == secondState.getEpoch(),
70 "In this implementation low watermark should be equal to epoch");
71
72 }
73
74 @Test(timeOut = 10_000)
75 public void testObserverRegistrationAndDeregistrationForStateChanges() throws Exception {
76
77
78 StateObserver observer1 = spy(new DummyObserver());
79 stateManager.register(observer1);
80
81
82 TSOState state = stateManager.initialize();
83
84
85 verify(observer1, timeout(100).times(1)).update(eq(state));
86
87
88 StateObserver observer2 = spy(new DummyObserver());
89 stateManager.register(observer2);
90
91
92 state = stateManager.initialize();
93
94
95 verify(observer1, timeout(100).times(1)).update(eq(state));
96 verify(observer2, timeout(100).times(1)).update(eq(state));
97
98
99 stateManager.unregister(observer1);
100
101
102 state = stateManager.initialize();
103
104
105 verify(observer1, timeout(100).times(0)).update(eq(state));
106 verify(observer2, timeout(100).times(1)).update(eq(state));
107 }
108
109
110
111
112
113 private class DummyObserver implements StateObserver {
114
115 @Override
116 public void update(TSOState state) throws IOException {
117 }
118
119 }
120
121 }