1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.ZNodeClearer;
28 import org.apache.hadoop.hbase.exceptions.DeserializationException;
29 import org.apache.hadoop.hbase.Server;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
32 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
33 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
35 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
36 import org.apache.zookeeper.KeeperException;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class ActiveMasterManager extends ZooKeeperListener {
53 private static final Log LOG = LogFactory.getLog(ActiveMasterManager.class);
54
55 final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
56 final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
57
58 private final ServerName sn;
59 private final Server master;
60
61
62
63
64
65
66 ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master) {
67 super(watcher);
68 this.sn = sn;
69 this.master = master;
70 }
71
72 @Override
73 public void nodeCreated(String path) {
74 handle(path);
75 }
76
77 @Override
78 public void nodeDeleted(String path) {
79
80
81
82
83
84
85
86
87 if(path.equals(watcher.clusterStateZNode) && !master.isStopped()) {
88 clusterShutDown.set(true);
89 }
90
91 handle(path);
92 }
93
94 void handle(final String path) {
95 if (path.equals(watcher.getMasterAddressZNode()) && !master.isStopped()) {
96 handleMasterNodeChange();
97 }
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 private void handleMasterNodeChange() {
114
115 try {
116 synchronized(clusterHasActiveMaster) {
117 if (ZKUtil.watchAndCheckExists(watcher, watcher.getMasterAddressZNode())) {
118
119 LOG.debug("A master is now available");
120 clusterHasActiveMaster.set(true);
121 } else {
122
123 LOG.debug("No master available. Notifying waiting threads");
124 clusterHasActiveMaster.set(false);
125
126 clusterHasActiveMaster.notifyAll();
127 }
128 }
129 } catch (KeeperException ke) {
130 master.abort("Received an unexpected KeeperException, aborting", ke);
131 }
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus) {
148 while (true) {
149 startupStatus.setStatus("Trying to register in ZK as active master");
150
151
152 try {
153 String backupZNode =
154 ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString());
155 if (MasterAddressTracker.setMasterAddress(this.watcher,
156 this.watcher.getMasterAddressZNode(), this.sn)) {
157
158
159
160 if (ZKUtil.checkExists(this.watcher, backupZNode) != -1) {
161 LOG.info("Deleting ZNode for " + backupZNode + " from backup master directory");
162 ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
163 }
164
165 ZNodeClearer.writeMyEphemeralNodeOnDisk(this.sn.toString());
166
167
168 startupStatus.setStatus("Successfully registered as active master.");
169 this.clusterHasActiveMaster.set(true);
170 LOG.info("Registered Active Master=" + this.sn);
171 return true;
172 }
173
174
175
176 this.clusterHasActiveMaster.set(true);
177
178
179
180
181
182
183
184
185
186 LOG.info("Adding ZNode for " + backupZNode + " in backup master directory");
187 MasterAddressTracker.setMasterAddress(this.watcher, backupZNode, this.sn);
188
189 String msg;
190 byte[] bytes =
191 ZKUtil.getDataAndWatch(this.watcher, this.watcher.getMasterAddressZNode());
192 if (bytes == null) {
193 msg = ("A master was detected, but went down before its address " +
194 "could be read. Attempting to become the next active master");
195 } else {
196 ServerName currentMaster;
197 try {
198 currentMaster = ServerName.parseFrom(bytes);
199 } catch (DeserializationException e) {
200 LOG.warn("Failed parse", e);
201
202 continue;
203 }
204 if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
205 msg = ("Current master has this master's address, " +
206 currentMaster + "; master was restarted? Deleting node.");
207
208 ZKUtil.deleteNode(this.watcher, this.watcher.getMasterAddressZNode());
209
210
211
212 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
213 } else {
214 msg = "Another master is the active master, " + currentMaster +
215 "; waiting to become the next active master";
216 }
217 }
218 LOG.info(msg);
219 startupStatus.setStatus(msg);
220 } catch (KeeperException ke) {
221 master.abort("Received an unexpected KeeperException, aborting", ke);
222 return false;
223 }
224 synchronized (this.clusterHasActiveMaster) {
225 while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
226 try {
227 this.clusterHasActiveMaster.wait();
228 } catch (InterruptedException e) {
229
230
231 LOG.debug("Interrupted waiting for master to die", e);
232 }
233 }
234 if (clusterShutDown.get()) {
235 this.master.stop(
236 "Cluster went down before this master became active");
237 }
238 if (this.master.isStopped()) {
239 return false;
240 }
241
242 }
243 }
244 }
245
246
247
248
249 public boolean isActiveMaster() {
250 try {
251 if (ZKUtil.checkExists(watcher, watcher.getMasterAddressZNode()) >= 0) {
252 return true;
253 }
254 }
255 catch (KeeperException ke) {
256 LOG.info("Received an unexpected KeeperException when checking " +
257 "isActiveMaster : "+ ke);
258 }
259 return false;
260 }
261
262 public void stop() {
263 try {
264
265 ServerName activeMaster = null;
266 try {
267 activeMaster = MasterAddressTracker.getMasterAddress(this.watcher);
268 } catch (IOException e) {
269 LOG.warn("Failed get of master address: " + e.toString());
270 }
271 if (activeMaster != null && activeMaster.equals(this.sn)) {
272 ZKUtil.deleteNode(watcher, watcher.getMasterAddressZNode());
273
274
275 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
276 }
277 } catch (KeeperException e) {
278 LOG.error(this.watcher.prefix("Error deleting our own master address node"), e);
279 }
280 }
281 }