View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.curator.framework.CuratorFramework;
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  
24  public class PausableLeaseManager extends LeaseManager {
25  
26      private static final Logger LOG = LoggerFactory.getLogger(PausableLeaseManager.class);
27  
28      private volatile boolean pausedInTryToGetInitialLeasePeriod = false;
29      private volatile boolean pausedInTryToRenewLeasePeriod = false;
30      private volatile boolean pausedInStillInLeasePeriod = false;
31  
32      public PausableLeaseManager(String id,
33                                  TSOChannelHandler tsoChannelHandler,
34                                  TSOStateManager stateManager,
35                                  long leasePeriodInMs,
36                                  String tsoLeasePath,
37                                  String currentTSOPath,
38                                  CuratorFramework zkClient,
39                                  Panicker panicker) {
40          super(id, tsoChannelHandler, stateManager, leasePeriodInMs, tsoLeasePath, currentTSOPath, zkClient, panicker);
41      }
42  
43      @Override
44      public void tryToGetInitialLeasePeriod() throws Exception {
45          while (pausedInTryToGetInitialLeasePeriod) {
46              synchronized (this) {
47                  try {
48                      LOG.info("{} paused in tryToGetInitialLeasePeriod()", this);
49                      this.wait();
50                  } catch (InterruptedException e) {
51                      LOG.error("Interrupted whilst paused");
52                      Thread.currentThread().interrupt();
53                  }
54              }
55          }
56          super.tryToGetInitialLeasePeriod();
57      }
58  
59      @Override
60      public void tryToRenewLeasePeriod() throws Exception {
61          while (pausedInTryToRenewLeasePeriod) {
62              synchronized (this) {
63                  try {
64                      LOG.info("{} paused in tryToRenewLeasePeriod()", this);
65                      this.wait();
66                  } catch (InterruptedException e) {
67                      LOG.error("Interrupted whilst paused");
68                      Thread.currentThread().interrupt();
69                  }
70              }
71          }
72          super.tryToRenewLeasePeriod();
73      }
74  
75      @Override
76      public boolean stillInLeasePeriod() {
77          while (pausedInStillInLeasePeriod) {
78              synchronized (this) {
79                  try {
80                      LOG.info("{} paused in stillInLeasePeriod()", this);
81                      this.wait();
82                  } catch (InterruptedException e) {
83                      LOG.error("Interrupted whilst paused");
84                      Thread.currentThread().interrupt();
85                  }
86              }
87          }
88          return super.stillInLeasePeriod();
89      }
90  
91      // ----------------------------------------------------------------------------------------------------------------
92      // Helper Methods to pause functionality
93      // ----------------------------------------------------------------------------------------------------------------
94  
95      public synchronized void pausedInTryToGetInitialLeasePeriod() {
96          pausedInTryToGetInitialLeasePeriod = true;
97          this.notifyAll();
98      }
99  
100     public synchronized void pausedInTryToRenewLeasePeriod() {
101         pausedInTryToRenewLeasePeriod = true;
102         this.notifyAll();
103     }
104 
105     public synchronized void pausedInStillInLeasePeriod() {
106         pausedInStillInLeasePeriod = true;
107         this.notifyAll();
108     }
109 
110     public synchronized void resume() {
111         pausedInTryToGetInitialLeasePeriod = false;
112         pausedInTryToRenewLeasePeriod = false;
113         pausedInStillInLeasePeriod = false;
114         this.notifyAll();
115     }
116 
117 }