1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Abortable;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
35 import org.apache.hadoop.hbase.util.Threads;
36 import org.apache.zookeeper.KeeperException;
37 import org.apache.zookeeper.WatchedEvent;
38 import org.apache.zookeeper.Watcher;
39 import org.apache.zookeeper.ZooDefs;
40 import org.apache.zookeeper.data.ACL;
41
42
43
44
45
46
47
48
49
50
51
52
53 @InterfaceAudience.Private
54 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
55 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
56
57
58
59 private String prefix;
60 private String identifier;
61
62
63 private String quorum;
64
65
66 private RecoverableZooKeeper recoverableZooKeeper;
67
68
69 protected Abortable abortable;
70
71 private boolean aborted = false;
72
73
74 private final List<ZooKeeperListener> listeners =
75 new CopyOnWriteArrayList<ZooKeeperListener>();
76
77
78
79 public CountDownLatch saslLatch = new CountDownLatch(1);
80
81
82
83
84 public String baseZNode;
85
86 public String metaServerZNode;
87
88 public String rsZNode;
89
90 public String drainingZNode;
91
92 private String masterAddressZNode;
93
94 public String backupMasterAddressesZNode;
95
96 public String clusterStateZNode;
97
98 public String assignmentZNode;
99
100 public String tableZNode;
101
102 public String clusterIdZNode;
103
104 public String splitLogZNode;
105
106 public String balancerZNode;
107
108 public String tableLockZNode;
109
110 public String recoveringRegionsZNode;
111
112 public static String namespaceZNode = "namespace";
113
114
115 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
116 new ArrayList<ACL>() { {
117 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
118 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
119 }};
120
121 private final Configuration conf;
122
123 private final Exception constructorCaller;
124
125
126
127
128
129
130
131
132 public ZooKeeperWatcher(Configuration conf, String identifier,
133 Abortable abortable) throws ZooKeeperConnectionException, IOException {
134 this(conf, identifier, abortable, false);
135 }
136
137
138
139
140
141
142
143
144
145
146
147
148 public ZooKeeperWatcher(Configuration conf, String identifier,
149 Abortable abortable, boolean canCreateBaseZNode)
150 throws IOException, ZooKeeperConnectionException {
151 this.conf = conf;
152
153
154 try {
155 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
156 } catch (Exception e) {
157 this.constructorCaller = e;
158 }
159 this.quorum = ZKConfig.getZKQuorumServersString(conf);
160 this.prefix = identifier;
161
162
163 this.identifier = identifier + "0x0";
164 this.abortable = abortable;
165 setNodeNames(conf);
166 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
167 if (canCreateBaseZNode) {
168 createBaseZNodes();
169 }
170 }
171
172 private void createBaseZNodes() throws ZooKeeperConnectionException {
173 try {
174
175 ZKUtil.createWithParents(this, baseZNode);
176 if (conf.getBoolean("hbase.assignment.usezk", true)) {
177 ZKUtil.createAndFailSilent(this, assignmentZNode);
178 }
179 ZKUtil.createAndFailSilent(this, rsZNode);
180 ZKUtil.createAndFailSilent(this, drainingZNode);
181 ZKUtil.createAndFailSilent(this, tableZNode);
182 ZKUtil.createAndFailSilent(this, splitLogZNode);
183 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
184 ZKUtil.createAndFailSilent(this, tableLockZNode);
185 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
186 } catch (KeeperException e) {
187 throw new ZooKeeperConnectionException(
188 prefix("Unexpected KeeperException creating base node"), e);
189 }
190 }
191
192 @Override
193 public String toString() {
194 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
195 }
196
197
198
199
200
201
202
203 public String prefix(final String str) {
204 return this.toString() + " " + str;
205 }
206
207
208
209
210 private void setNodeNames(Configuration conf) {
211 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
212 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
213 metaServerZNode = ZKUtil.joinZNode(baseZNode,
214 conf.get("zookeeper.znode.metaserver", "meta-region-server"));
215 rsZNode = ZKUtil.joinZNode(baseZNode,
216 conf.get("zookeeper.znode.rs", "rs"));
217 drainingZNode = ZKUtil.joinZNode(baseZNode,
218 conf.get("zookeeper.znode.draining.rs", "draining"));
219 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
220 conf.get("zookeeper.znode.master", "master"));
221 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
222 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
223 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
224 conf.get("zookeeper.znode.state", "running"));
225 assignmentZNode = ZKUtil.joinZNode(baseZNode,
226 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
227 tableZNode = ZKUtil.joinZNode(baseZNode,
228 conf.get("zookeeper.znode.tableEnableDisable", "table"));
229 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
230 conf.get("zookeeper.znode.clusterId", "hbaseid"));
231 splitLogZNode = ZKUtil.joinZNode(baseZNode,
232 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
233 balancerZNode = ZKUtil.joinZNode(baseZNode,
234 conf.get("zookeeper.znode.balancer", "balancer"));
235 tableLockZNode = ZKUtil.joinZNode(baseZNode,
236 conf.get("zookeeper.znode.tableLock", "table-lock"));
237 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
238 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
239 namespaceZNode = ZKUtil.joinZNode(baseZNode,
240 conf.get("zookeeper.znode.namespace", "namespace"));
241 }
242
243
244
245
246
247 public void registerListener(ZooKeeperListener listener) {
248 listeners.add(listener);
249 }
250
251
252
253
254
255
256 public void registerListenerFirst(ZooKeeperListener listener) {
257 listeners.add(0, listener);
258 }
259
260 public void unregisterListener(ZooKeeperListener listener) {
261 listeners.remove(listener);
262 }
263
264
265
266
267 public void unregisterAllListeners() {
268 listeners.clear();
269 }
270
271
272
273
274 public List<ZooKeeperListener> getListeners() {
275 return new ArrayList<ZooKeeperListener>(listeners);
276 }
277
278
279
280
281 public int getNumberOfListeners() {
282 return listeners.size();
283 }
284
285
286
287
288
289 public RecoverableZooKeeper getRecoverableZooKeeper() {
290 return recoverableZooKeeper;
291 }
292
293 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
294 recoverableZooKeeper.reconnectAfterExpiration();
295 }
296
297
298
299
300
301 public String getQuorum() {
302 return quorum;
303 }
304
305
306
307
308 public String getBaseZNode() {
309 return baseZNode;
310 }
311
312
313
314
315
316
317
318 @Override
319 public void process(WatchedEvent event) {
320 LOG.debug(prefix("Received ZooKeeper Event, " +
321 "type=" + event.getType() + ", " +
322 "state=" + event.getState() + ", " +
323 "path=" + event.getPath()));
324
325 switch(event.getType()) {
326
327
328 case None: {
329 connectionEvent(event);
330 break;
331 }
332
333
334
335 case NodeCreated: {
336 for(ZooKeeperListener listener : listeners) {
337 listener.nodeCreated(event.getPath());
338 }
339 break;
340 }
341
342 case NodeDeleted: {
343 for(ZooKeeperListener listener : listeners) {
344 listener.nodeDeleted(event.getPath());
345 }
346 break;
347 }
348
349 case NodeDataChanged: {
350 for(ZooKeeperListener listener : listeners) {
351 listener.nodeDataChanged(event.getPath());
352 }
353 break;
354 }
355
356 case NodeChildrenChanged: {
357 for(ZooKeeperListener listener : listeners) {
358 listener.nodeChildrenChanged(event.getPath());
359 }
360 break;
361 }
362 }
363 }
364
365
366
367
368
369
370
371
372
373
374
375
376
377 private void connectionEvent(WatchedEvent event) {
378 switch(event.getState()) {
379 case SyncConnected:
380
381
382 long finished = System.currentTimeMillis() +
383 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
384 while (System.currentTimeMillis() < finished) {
385 Threads.sleep(1);
386 if (this.recoverableZooKeeper != null) break;
387 }
388 if (this.recoverableZooKeeper == null) {
389 LOG.error("ZK is null on connection event -- see stack trace " +
390 "for the stack trace when constructor was called on this zkw",
391 this.constructorCaller);
392 throw new NullPointerException("ZK is null");
393 }
394 this.identifier = this.prefix + "-0x" +
395 Long.toHexString(this.recoverableZooKeeper.getSessionId());
396
397 LOG.debug(this.identifier + " connected");
398 break;
399
400
401 case Disconnected:
402 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
403 break;
404
405 case Expired:
406 String msg = prefix(this.identifier + " received expired from " +
407 "ZooKeeper, aborting");
408
409
410 if (this.abortable != null) {
411 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
412 }
413 break;
414
415 case ConnectedReadOnly:
416 case SaslAuthenticated:
417 case AuthFailed:
418 break;
419
420 default:
421 throw new IllegalStateException("Received event is not valid: " + event.getState());
422 }
423 }
424
425
426
427
428
429
430
431
432
433
434
435
436
437 public void sync(String path) throws KeeperException {
438 this.recoverableZooKeeper.sync(path, null, null);
439 }
440
441
442
443
444
445
446
447
448
449
450
451 public void keeperException(KeeperException ke)
452 throws KeeperException {
453 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
454 throw ke;
455 }
456
457
458
459
460
461
462
463
464
465
466
467
468 public void interruptedException(InterruptedException ie) {
469 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
470
471 Thread.currentThread().interrupt();
472
473 }
474
475
476
477
478
479
480 public void close() {
481 try {
482 if (recoverableZooKeeper != null) {
483 recoverableZooKeeper.close();
484 }
485 } catch (InterruptedException e) {
486 Thread.currentThread().interrupt();
487 }
488 }
489
490 public Configuration getConfiguration() {
491 return conf;
492 }
493
494 @Override
495 public void abort(String why, Throwable e) {
496 if (this.abortable != null) this.abortable.abort(why, e);
497 else this.aborted = true;
498 }
499
500 @Override
501 public boolean isAborted() {
502 return this.abortable == null? this.aborted: this.abortable.isAborted();
503 }
504
505
506
507
508 public String getMasterAddressZNode() {
509 return this.masterAddressZNode;
510 }
511
512 }