1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
21 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
22 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractScheduledService;
23 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.apache.curator.framework.CuratorFramework;
25 import org.apache.curator.utils.EnsurePath;
26 import org.apache.omid.tso.TSOStateManager.TSOState;
27 import org.apache.zookeeper.KeeperException;
28 import org.apache.zookeeper.data.Stat;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.IOException;
33 import java.text.SimpleDateFormat;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicLong;
38
39
40
41
42
43
44
45
46 class LeaseManager extends AbstractScheduledService implements LeaseManagement {
47
48 private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class);
49
50 private final CuratorFramework zkClient;
51
52 private final Panicker panicker;
53
54 private final String tsoHostAndPort;
55
56 private final TSOStateManager stateManager;
57 private final ExecutorService tsoStateInitializer = Executors.newSingleThreadExecutor(
58 new ThreadFactoryBuilder()
59 .setNameFormat("tso-state-initializer")
60 .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
61 @Override
62 public void uncaughtException(Thread t, Throwable e) {
63 panicker.panic(t + " threw exception", e);
64 }
65 })
66 .build());
67
68
69 private final long leasePeriodInMs;
70 private final TSOChannelHandler tsoChannelHandler;
71 private int leaseNodeVersion;
72 private final AtomicLong endLeaseInMs = new AtomicLong(0L);
73 private final AtomicLong baseTimeInMs = new AtomicLong(0L);
74
75 private final String leasePath;
76 private final String currentTSOPath;
77
78 LeaseManager(String tsoHostAndPort,
79 TSOChannelHandler tsoChannelHandler,
80 TSOStateManager stateManager,
81 long leasePeriodInMs,
82 String leasePath,
83 String currentTSOPath,
84 CuratorFramework zkClient,
85 Panicker panicker) {
86
87 this.tsoHostAndPort = tsoHostAndPort;
88 this.tsoChannelHandler = tsoChannelHandler;
89 this.stateManager = stateManager;
90 this.leasePeriodInMs = leasePeriodInMs;
91 this.leasePath = leasePath;
92 this.currentTSOPath = currentTSOPath;
93 this.zkClient = zkClient;
94 this.panicker = panicker;
95 LOG.info("LeaseManager {} initialized. Lease period {}ms", toString(), leasePeriodInMs);
96
97 }
98
99
100
101
102
103 @Override
104 public void startService() throws LeaseManagementException {
105 createLeaseManagementZNode();
106 createCurrentTSOZNode();
107 startAsync();
108 awaitRunning();
109 }
110
111 @Override
112 public void stopService() throws LeaseManagementException {
113 stopAsync();
114 awaitTerminated();
115 }
116
117 @Override
118 public boolean stillInLeasePeriod() {
119 return System.currentTimeMillis() <= getEndLeaseInMs();
120 }
121
122
123
124
125
126 void tryToGetInitialLeasePeriod() throws Exception {
127 baseTimeInMs.set(System.currentTimeMillis());
128 if (canAcquireLease()) {
129 endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
130 LOG.info("{} got the lease (Master) Ver. {}/End of lease: {}ms", tsoHostAndPort,
131 leaseNodeVersion, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(endLeaseInMs));
132 tsoStateInitializer.submit(new Runnable() {
133
134 @Override
135 public void run() {
136 try {
137 TSOState newTSOState = stateManager.initialize();
138 advertiseTSOServerInfoThroughZK(newTSOState.getEpoch());
139 tsoChannelHandler.reconnect();
140 } catch (Exception e) {
141 Thread t = Thread.currentThread();
142 t.getUncaughtExceptionHandler().uncaughtException(t, e);
143 }
144 }
145 });
146 } else {
147 tsoStateInitializer.submit(new Runnable() {
148
149 @Override
150 public void run() {
151
152 tsoChannelHandler.closeConnection();
153 }
154 });
155 }
156 }
157
158 void tryToRenewLeasePeriod() throws Exception {
159 baseTimeInMs.set(System.currentTimeMillis());
160 if (canAcquireLease()) {
161 if (System.currentTimeMillis() > getEndLeaseInMs()) {
162 endLeaseInMs.set(0L);
163 panicker.panic(tsoHostAndPort + " expired lease! Master is committing suicide");
164 } else {
165 endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
166 LOG.trace("{} renewed lease: Version {}/End of lease at {}ms",
167 tsoHostAndPort, leaseNodeVersion, endLeaseInMs);
168 }
169 } else {
170 endLeaseInMs.set(0L);
171 panicker.panic(tsoHostAndPort + " lease lost (Ver. " + leaseNodeVersion + ")! Other instance is Master. Committing suicide...");
172 }
173 }
174
175 private boolean haveLease() {
176 return stillInLeasePeriod();
177 }
178
179 private long getEndLeaseInMs() {
180 return endLeaseInMs.get();
181 }
182
183 private boolean canAcquireLease() throws Exception {
184 try {
185 int previousLeaseNodeVersion = leaseNodeVersion;
186 final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
187
188 Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
189 .forPath(leasePath, instanceInfo);
190 leaseNodeVersion = stat.getVersion();
191 LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
192 } catch (KeeperException.BadVersionException e) {
193 return false;
194 }
195 return true;
196 }
197
198
199
200
201
202 @Override
203 protected void startUp() {
204 }
205
206 @Override
207 protected void shutDown() {
208 try {
209 tsoChannelHandler.close();
210 LOG.info("Channel handler closed");
211 } catch (IOException e) {
212 LOG.error("Error closing TSOChannelHandler", e);
213 }
214 }
215
216 @Override
217 protected void runOneIteration() throws Exception {
218
219 if (!haveLease()) {
220 tryToGetInitialLeasePeriod();
221 } else {
222 tryToRenewLeasePeriod();
223 }
224
225 }
226
227 @Override
228 protected Scheduler scheduler() {
229
230 final long guardLeasePeriodInMs = leasePeriodInMs / 4;
231
232 return new AbstractScheduledService.CustomScheduler() {
233
234 @Override
235 protected Schedule getNextSchedule() throws Exception {
236 if (!haveLease()) {
237
238 Stat stat = zkClient.checkExists().forPath(leasePath);
239 leaseNodeVersion = stat.getVersion();
240 LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
241 leasePeriodInMs);
242
243 return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
244 } else {
245 long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
246 LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
247 leaseNodeVersion, waitTimeInMs);
248 return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
249 }
250 }
251 };
252
253 }
254
255
256
257
258
259 @Override
260 public String toString() {
261 return tsoHostAndPort;
262 }
263
264 private void createLeaseManagementZNode() throws LeaseManagementException {
265 try {
266 validateZKPath(leasePath);
267 } catch (Exception e) {
268 throw new LeaseManagementException("Error creating Lease Management ZNode", e);
269 }
270 }
271
272 private void createCurrentTSOZNode() throws LeaseManagementException {
273 try {
274 validateZKPath(currentTSOPath);
275 } catch (Exception e) {
276 throw new LeaseManagementException("Error creating TSO ZNode", e);
277 }
278 }
279
280 private void validateZKPath(String zkPath) throws Exception {
281 EnsurePath path = zkClient.newNamespaceAwareEnsurePath(zkPath);
282 path.ensure(zkClient.getZookeeperClient());
283 Stat stat = zkClient.checkExists().forPath(zkPath);
284 Preconditions.checkNotNull(stat);
285 LOG.info("Path {} ensured", path.getPath());
286 }
287
288 private void advertiseTSOServerInfoThroughZK(long epoch) throws Exception {
289
290 Stat previousTSOZNodeStat = new Stat();
291 byte[] previousTSOInfoAsBytes = zkClient.getData().storingStatIn(previousTSOZNodeStat).forPath(currentTSOPath);
292 if (previousTSOInfoAsBytes != null && !new String(previousTSOInfoAsBytes, Charsets.UTF_8).isEmpty()) {
293 String previousTSOInfo = new String(previousTSOInfoAsBytes, Charsets.UTF_8);
294 String[] previousTSOAndEpochArray = previousTSOInfo.split("#");
295 Preconditions.checkArgument(previousTSOAndEpochArray.length == 2, "Incorrect TSO Info found: ", previousTSOInfo);
296 long oldEpoch = Long.parseLong(previousTSOAndEpochArray[1]);
297 if (oldEpoch > epoch) {
298 throw new LeaseManagementException("Another TSO replica was found " + previousTSOInfo);
299 }
300 }
301 String tsoInfoAsString = tsoHostAndPort + "#" + Long.toString(epoch);
302 byte[] tsoInfoAsBytes = tsoInfoAsString.getBytes(Charsets.UTF_8);
303 zkClient.setData().withVersion(previousTSOZNodeStat.getVersion()).forPath(currentTSOPath, tsoInfoAsBytes);
304 LOG.info("TSO instance {} (Epoch {}) advertised through ZK", tsoHostAndPort, epoch);
305
306 }
307
308 }