1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import javax.inject.Inject;
25 import java.util.ArrayList;
26 import java.util.List;
27
28
29
30
31 public class TSOStateManagerImpl implements TSOStateManager {
32
33 private static final Logger LOG = LoggerFactory.getLogger(TSOStateManagerImpl.class);
34
35 private List<StateObserver> stateObservers = new ArrayList<>();
36
37 private TSOState state;
38
39 private TimestampOracle timestampOracle;
40
41 @Inject
42 public TSOStateManagerImpl(TimestampOracle timestampOracle) {
43 this.timestampOracle = timestampOracle;
44 }
45
46 @Override
47 public synchronized void register(StateObserver newObserver) {
48 Preconditions.checkNotNull(newObserver, "Trying to register a null observer");
49 if (!stateObservers.contains(newObserver)) {
50 stateObservers.add(newObserver);
51 }
52 }
53
54 @Override
55 public synchronized void unregister(StateObserver observer) {
56 stateObservers.remove(observer);
57 }
58
59 @Override
60 public TSOState initialize() throws Exception {
61
62 LOG.info("Initializing TSO Server state...");
63
64 timestampOracle.initialize();
65 long lowWatermark = timestampOracle.getLast();
66
67 long epoch = lowWatermark;
68 state = new TSOState(lowWatermark, epoch);
69
70
71 for (StateObserver stateObserver : stateObservers) {
72 stateObserver.update(state);
73 }
74 LOG.info("TSO Server state {}", state);
75 return state;
76
77 }
78
79 }