1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.curator.framework.CuratorFramework;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class PausableLeaseManager extends LeaseManager {
25
26 private static final Logger LOG = LoggerFactory.getLogger(PausableLeaseManager.class);
27
28 private volatile boolean pausedInTryToGetInitialLeasePeriod = false;
29 private volatile boolean pausedInTryToRenewLeasePeriod = false;
30 private volatile boolean pausedInStillInLeasePeriod = false;
31
32 public PausableLeaseManager(String id,
33 TSOChannelHandler tsoChannelHandler,
34 TSOStateManager stateManager,
35 long leasePeriodInMs,
36 String tsoLeasePath,
37 String currentTSOPath,
38 CuratorFramework zkClient,
39 Panicker panicker) {
40 super(id, tsoChannelHandler, stateManager, leasePeriodInMs, tsoLeasePath, currentTSOPath, zkClient, panicker);
41 }
42
43 @Override
44 public void tryToGetInitialLeasePeriod() throws Exception {
45 while (pausedInTryToGetInitialLeasePeriod) {
46 synchronized (this) {
47 try {
48 LOG.info("{} paused in tryToGetInitialLeasePeriod()", this);
49 this.wait();
50 } catch (InterruptedException e) {
51 LOG.error("Interrupted whilst paused");
52 Thread.currentThread().interrupt();
53 }
54 }
55 }
56 super.tryToGetInitialLeasePeriod();
57 }
58
59 @Override
60 public void tryToRenewLeasePeriod() throws Exception {
61 while (pausedInTryToRenewLeasePeriod) {
62 synchronized (this) {
63 try {
64 LOG.info("{} paused in tryToRenewLeasePeriod()", this);
65 this.wait();
66 } catch (InterruptedException e) {
67 LOG.error("Interrupted whilst paused");
68 Thread.currentThread().interrupt();
69 }
70 }
71 }
72 super.tryToRenewLeasePeriod();
73 }
74
75 @Override
76 public boolean stillInLeasePeriod() {
77 while (pausedInStillInLeasePeriod) {
78 synchronized (this) {
79 try {
80 LOG.info("{} paused in stillInLeasePeriod()", this);
81 this.wait();
82 } catch (InterruptedException e) {
83 LOG.error("Interrupted whilst paused");
84 Thread.currentThread().interrupt();
85 }
86 }
87 }
88 return super.stillInLeasePeriod();
89 }
90
91
92
93
94
95 public synchronized void pausedInTryToGetInitialLeasePeriod() {
96 pausedInTryToGetInitialLeasePeriod = true;
97 this.notifyAll();
98 }
99
100 public synchronized void pausedInTryToRenewLeasePeriod() {
101 pausedInTryToRenewLeasePeriod = true;
102 this.notifyAll();
103 }
104
105 public synchronized void pausedInStillInLeasePeriod() {
106 pausedInStillInLeasePeriod = true;
107 this.notifyAll();
108 }
109
110 public synchronized void resume() {
111 pausedInTryToGetInitialLeasePeriod = false;
112 pausedInTryToRenewLeasePeriod = false;
113 pausedInStillInLeasePeriod = false;
114 this.notifyAll();
115 }
116
117 }