View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.base.Charsets;
21  import com.google.common.base.Preconditions;
22  import com.google.common.util.concurrent.AbstractScheduledService;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  import org.apache.curator.framework.CuratorFramework;
25  import org.apache.curator.utils.EnsurePath;
26  import org.apache.omid.tso.TSOStateManager.TSOState;
27  import org.apache.zookeeper.KeeperException;
28  import org.apache.zookeeper.data.Stat;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import java.io.IOException;
33  import java.text.SimpleDateFormat;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  /**
40   * Encompasses all the required elements to control the leases required for
41   * identifying the master instance when running multiple TSO instances for HA
42   * It delegates the initialization of the TSO state and the publication of
43   * the instance information when getting the lease to an asynchronous task to
44   * continue managing the leases without interruptions.
45   */
46  class LeaseManager extends AbstractScheduledService implements LeaseManagement {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class);
49  
50      private final CuratorFramework zkClient;
51  
52      private final Panicker panicker;
53  
54      private final String tsoHostAndPort;
55  
56      private final TSOStateManager stateManager;
57      private final ExecutorService tsoStateInitializer = Executors.newSingleThreadExecutor(
58              new ThreadFactoryBuilder()
59                      .setNameFormat("tso-state-initializer")
60                      .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
61                          @Override
62                          public void uncaughtException(Thread t, Throwable e) {
63                              panicker.panic(t + " threw exception", e);
64                          }
65                      })
66                      .build());
67  
68  
69      private final long leasePeriodInMs;
70      private final TSOChannelHandler tsoChannelHandler;
71      private int leaseNodeVersion;
72      private final AtomicLong endLeaseInMs = new AtomicLong(0L);
73      private final AtomicLong baseTimeInMs = new AtomicLong(0L);
74  
75      private final String leasePath;
76      private final String currentTSOPath;
77  
78      LeaseManager(String tsoHostAndPort,
79                   TSOChannelHandler tsoChannelHandler,
80                   TSOStateManager stateManager,
81                   long leasePeriodInMs,
82                   String leasePath,
83                   String currentTSOPath,
84                   CuratorFramework zkClient,
85                   Panicker panicker) {
86  
87          this.tsoHostAndPort = tsoHostAndPort;
88          this.tsoChannelHandler = tsoChannelHandler;
89          this.stateManager = stateManager;
90          this.leasePeriodInMs = leasePeriodInMs;
91          this.leasePath = leasePath;
92          this.currentTSOPath = currentTSOPath;
93          this.zkClient = zkClient;
94          this.panicker = panicker;
95          LOG.info("LeaseManager {} initialized. Lease period {}ms", toString(), leasePeriodInMs);
96  
97      }
98  
99      // ----------------------------------------------------------------------------------------------------------------
100     // LeaseManagement implementation
101     // ----------------------------------------------------------------------------------------------------------------
102 
103     @Override
104     public void startService() throws LeaseManagementException {
105         createLeaseManagementZNode();
106         createCurrentTSOZNode();
107         startAndWait();
108     }
109 
110     @Override
111     public void stopService() throws LeaseManagementException {
112         stopAndWait();
113     }
114 
115     @Override
116     public boolean stillInLeasePeriod() {
117         return System.currentTimeMillis() <= getEndLeaseInMs();
118     }
119 
120     // ----------------------------------------------------------------------------------------------------------------
121     // End LeaseManagement implementation
122     // ----------------------------------------------------------------------------------------------------------------
123 
124     void tryToGetInitialLeasePeriod() throws Exception {
125         baseTimeInMs.set(System.currentTimeMillis());
126         if (canAcquireLease()) {
127             endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
128             LOG.info("{} got the lease (Master) Ver. {}/End of lease: {}ms", tsoHostAndPort,
129                      leaseNodeVersion, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(endLeaseInMs));
130             tsoStateInitializer.submit(new Runnable() {
131                 // TSO State initialization
132                 @Override
133                 public void run() {
134                     try {
135                         TSOState newTSOState = stateManager.initialize();
136                         advertiseTSOServerInfoThroughZK(newTSOState.getEpoch());
137                         tsoChannelHandler.reconnect();
138                     } catch (Exception e) {
139                         Thread t = Thread.currentThread();
140                         t.getUncaughtExceptionHandler().uncaughtException(t, e);
141                     }
142                 }
143             });
144         } else {
145             tsoStateInitializer.submit(new Runnable() {
146                 // TSO State initialization
147                 @Override
148                 public void run() {
149                     // In case the TSO was paused close the connection
150                     tsoChannelHandler.closeConnection();
151                 }
152             });
153         }
154     }
155 
156     void tryToRenewLeasePeriod() throws Exception {
157         baseTimeInMs.set(System.currentTimeMillis());
158         if (canAcquireLease()) {
159             if (System.currentTimeMillis() > getEndLeaseInMs()) {
160                 endLeaseInMs.set(0L);
161                 panicker.panic(tsoHostAndPort + " expired lease! Master is committing suicide");
162             } else {
163                 endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
164                 LOG.trace("{} renewed lease: Version {}/End of lease at {}ms",
165                           tsoHostAndPort, leaseNodeVersion, endLeaseInMs);
166             }
167         } else {
168             endLeaseInMs.set(0L);
169             panicker.panic(tsoHostAndPort + " lease lost (Ver. " + leaseNodeVersion + ")! Other instance is Master. Committing suicide...");
170         }
171     }
172 
173     private boolean haveLease() {
174         return stillInLeasePeriod();
175     }
176 
177     private long getEndLeaseInMs() {
178         return endLeaseInMs.get();
179     }
180 
181     private boolean canAcquireLease() throws Exception {
182         try {
183             int previousLeaseNodeVersion = leaseNodeVersion;
184             final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
185             // Try to acquire the lease
186             Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
187                     .forPath(leasePath, instanceInfo);
188             leaseNodeVersion = stat.getVersion();
189             LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
190         } catch (KeeperException.BadVersionException e) {
191             return false;
192         }
193         return true;
194     }
195 
196     // ----------------------------------------------------------------------------------------------------------------
197     // AbstractScheduledService implementation
198     // ----------------------------------------------------------------------------------------------------------------
199 
200     @Override
201     protected void startUp() {
202     }
203 
204     @Override
205     protected void shutDown() {
206         try {
207             tsoChannelHandler.close();
208             LOG.info("Channel handler closed");
209         } catch (IOException e) {
210             LOG.error("Error closing TSOChannelHandler", e);
211         }
212     }
213 
214     @Override
215     protected void runOneIteration() throws Exception {
216 
217         if (!haveLease()) {
218             tryToGetInitialLeasePeriod();
219         } else {
220             tryToRenewLeasePeriod();
221         }
222 
223     }
224 
225     @Override
226     protected Scheduler scheduler() {
227 
228         final long guardLeasePeriodInMs = leasePeriodInMs / 4;
229 
230         return new AbstractScheduledService.CustomScheduler() {
231 
232             @Override
233             protected Schedule getNextSchedule() throws Exception {
234                 if (!haveLease()) {
235                     // Get the current node version...
236                     Stat stat = zkClient.checkExists().forPath(leasePath);
237                     leaseNodeVersion = stat.getVersion();
238                     LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
239                               leasePeriodInMs);
240                     // ...and wait the lease period
241                     return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
242                 } else {
243                     long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
244                     LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
245                               leaseNodeVersion, waitTimeInMs);
246                     return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
247                 }
248             }
249         };
250 
251     }
252 
253     // ----------------------------------------------------------------------------------------------------------------
254     // Helper methods
255     // ----------------------------------------------------------------------------------------------------------------
256 
257     @Override
258     public String toString() {
259         return tsoHostAndPort;
260     }
261 
262     private void createLeaseManagementZNode() throws LeaseManagementException {
263         try {
264             validateZKPath(leasePath);
265         } catch (Exception e) {
266             throw new LeaseManagementException("Error creating Lease Management ZNode", e);
267         }
268     }
269 
270     private void createCurrentTSOZNode() throws LeaseManagementException {
271         try {
272             validateZKPath(currentTSOPath);
273         } catch (Exception e) {
274             throw new LeaseManagementException("Error creating TSO ZNode", e);
275         }
276     }
277 
278     private void validateZKPath(String zkPath) throws Exception {
279         EnsurePath path = zkClient.newNamespaceAwareEnsurePath(zkPath);
280         path.ensure(zkClient.getZookeeperClient());
281         Stat stat = zkClient.checkExists().forPath(zkPath);
282         Preconditions.checkNotNull(stat);
283         LOG.info("Path {} ensured", path.getPath());
284     }
285 
286     private void advertiseTSOServerInfoThroughZK(long epoch) throws Exception {
287 
288         Stat previousTSOZNodeStat = new Stat();
289         byte[] previousTSOInfoAsBytes = zkClient.getData().storingStatIn(previousTSOZNodeStat).forPath(currentTSOPath);
290         if (previousTSOInfoAsBytes != null && !new String(previousTSOInfoAsBytes, Charsets.UTF_8).isEmpty()) {
291             String previousTSOInfo = new String(previousTSOInfoAsBytes, Charsets.UTF_8);
292             String[] previousTSOAndEpochArray = previousTSOInfo.split("#");
293             Preconditions.checkArgument(previousTSOAndEpochArray.length == 2, "Incorrect TSO Info found: ", previousTSOInfo);
294             long oldEpoch = Long.parseLong(previousTSOAndEpochArray[1]);
295             if (oldEpoch > epoch) {
296                 throw new LeaseManagementException("Another TSO replica was found " + previousTSOInfo);
297             }
298         }
299         String tsoInfoAsString = tsoHostAndPort + "#" + Long.toString(epoch);
300         byte[] tsoInfoAsBytes = tsoInfoAsString.getBytes(Charsets.UTF_8);
301         zkClient.setData().withVersion(previousTSOZNodeStat.getVersion()).forPath(currentTSOPath, tsoInfoAsBytes);
302         LOG.info("TSO instance {} (Epoch {}) advertised through ZK", tsoHostAndPort, epoch);
303 
304     }
305 
306 }