1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
21 import com.google.inject.AbstractModule;
22 import com.google.inject.Provider;
23 import com.google.inject.Provides;
24
25 import javax.inject.Named;
26 import javax.inject.Singleton;
27
28 import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
29
30 import java.net.SocketException;
31 import java.net.UnknownHostException;
32
33 import static org.apache.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY;
34
35 class TSOModule extends AbstractModule {
36
37 private final TSOServerConfig config;
38
39 TSOModule(TSOServerConfig config) {
40 Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
41 this.config = config;
42 }
43
44 @Override
45 protected void configure() {
46
47 bind(TSOChannelHandler.class).in(Singleton.class);
48 bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class);
49
50 if (config.getTimestampTypeEnum() == TIMESTAMP_TYPE.WORLD_TIME) {
51 bind(TimestampOracle.class).to(WorldClockOracleImpl.class).in(Singleton.class);
52 } else {
53 bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
54 }
55 bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
56 bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class);
57
58 install(new BatchPoolModule(config));
59
60 install(new DisruptorModule(config));
61
62 }
63
64 @Provides
65 TSOServerConfig provideTSOServerConfig() {
66 return config;
67 }
68
69 @Provides
70 @Named(TSO_HOST_AND_PORT_KEY)
71 String provideTSOHostAndPort() throws SocketException, UnknownHostException {
72 return NetworkInterfaceUtils.getTSOHostAndPort(config);
73
74 }
75
76 @Provides
77 PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
78 PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
79 for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
80 persistenceProcessorHandlers[i] = provider.get();
81 }
82 return persistenceProcessorHandlers;
83 }
84
85 }