1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.BufferedReader;
22 import java.io.IOException;
23 import java.io.InputStreamReader;
24 import java.io.PrintWriter;
25 import java.net.InetSocketAddress;
26 import java.net.Socket;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Properties;
34
35 import javax.security.auth.login.AppConfigurationEntry;
36 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
37
38 import org.apache.hadoop.hbase.util.ByteStringer;
39 import org.apache.commons.lang.StringUtils;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.hbase.classification.InterfaceAudience;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.exceptions.DeserializationException;
47 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
48 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
49 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
53 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
54 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
55 import org.apache.hadoop.security.SecurityUtil;
56 import org.apache.hadoop.security.authentication.util.KerberosUtil;
57 import org.apache.zookeeper.AsyncCallback;
58 import org.apache.zookeeper.CreateMode;
59 import org.apache.zookeeper.KeeperException;
60 import org.apache.zookeeper.KeeperException.NoNodeException;
61 import org.apache.zookeeper.Op;
62 import org.apache.zookeeper.Watcher;
63 import org.apache.zookeeper.ZooDefs.Ids;
64 import org.apache.zookeeper.ZooDefs.Perms;
65 import org.apache.zookeeper.ZooKeeper;
66 import org.apache.zookeeper.client.ZooKeeperSaslClient;
67 import org.apache.zookeeper.data.ACL;
68 import org.apache.zookeeper.data.Id;
69 import org.apache.zookeeper.data.Stat;
70 import org.apache.zookeeper.proto.CreateRequest;
71 import org.apache.zookeeper.proto.DeleteRequest;
72 import org.apache.zookeeper.proto.SetDataRequest;
73 import org.apache.zookeeper.server.ZooKeeperSaslServer;
74
75 import com.google.protobuf.InvalidProtocolBufferException;
76
77
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Private
87 public class ZKUtil {
88 private static final Log LOG = LogFactory.getLog(ZKUtil.class);
89
90
91 public static final char ZNODE_PATH_SEPARATOR = '/';
92 private static int zkDumpConnectionTimeOut;
93
94
95
96
97
98
99
100
101
102
103
104
105 public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
106 throws IOException {
107 Properties properties = ZKConfig.makeZKProps(conf);
108 String ensemble = ZKConfig.getZKQuorumServersString(properties);
109 return connect(conf, ensemble, watcher);
110 }
111
112 public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
113 Watcher watcher)
114 throws IOException {
115 return connect(conf, ensemble, watcher, null);
116 }
117
118 public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
119 Watcher watcher, final String identifier)
120 throws IOException {
121 if(ensemble == null) {
122 throw new IOException("Unable to determine ZooKeeper ensemble");
123 }
124 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
125 HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
126 if (LOG.isTraceEnabled()) {
127 LOG.trace(identifier + " opening connection to ZooKeeper ensemble=" + ensemble);
128 }
129 int retry = conf.getInt("zookeeper.recovery.retry", 3);
130 int retryIntervalMillis =
131 conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
132 zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
133 1000);
134 return new RecoverableZooKeeper(ensemble, timeout, watcher,
135 retry, retryIntervalMillis, identifier);
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public static void loginServer(Configuration conf, String keytabFileKey,
153 String userNameKey, String hostname) throws IOException {
154 login(conf, keytabFileKey, userNameKey, hostname,
155 ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
156 JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME);
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 public static void loginClient(Configuration conf, String keytabFileKey,
174 String userNameKey, String hostname) throws IOException {
175 login(conf, keytabFileKey, userNameKey, hostname,
176 ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
177 JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME);
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196 private static void login(Configuration conf, String keytabFileKey,
197 String userNameKey, String hostname,
198 String loginContextProperty, String loginContextName)
199 throws IOException {
200 if (!isSecureZooKeeper(conf))
201 return;
202
203
204
205 if (System.getProperty("java.security.auth.login.config") != null)
206 return;
207
208
209 String keytabFilename = conf.get(keytabFileKey);
210 if (keytabFilename == null) {
211 LOG.warn("no keytab specified for: " + keytabFileKey);
212 return;
213 }
214
215 String principalConfig = conf.get(userNameKey, System.getProperty("user.name"));
216 String principalName = SecurityUtil.getServerPrincipal(principalConfig, hostname);
217
218
219
220
221 JaasConfiguration jaasConf = new JaasConfiguration(loginContextName,
222 principalName, keytabFilename);
223 javax.security.auth.login.Configuration.setConfiguration(jaasConf);
224 System.setProperty(loginContextProperty, loginContextName);
225 }
226
227
228
229
230 private static class JaasConfiguration extends javax.security.auth.login.Configuration {
231 private static final String SERVER_KEYTAB_KERBEROS_CONFIG_NAME =
232 "zookeeper-server-keytab-kerberos";
233 private static final String CLIENT_KEYTAB_KERBEROS_CONFIG_NAME =
234 "zookeeper-client-keytab-kerberos";
235
236 private static final Map<String, String> BASIC_JAAS_OPTIONS =
237 new HashMap<String,String>();
238 static {
239 String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
240 if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
241 BASIC_JAAS_OPTIONS.put("debug", "true");
242 }
243 }
244
245 private static final Map<String,String> KEYTAB_KERBEROS_OPTIONS =
246 new HashMap<String,String>();
247 static {
248 KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
249 KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
250 KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
251 KEYTAB_KERBEROS_OPTIONS.putAll(BASIC_JAAS_OPTIONS);
252 }
253
254 private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
255 new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
256 LoginModuleControlFlag.REQUIRED,
257 KEYTAB_KERBEROS_OPTIONS);
258
259 private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
260 new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN};
261
262 private javax.security.auth.login.Configuration baseConfig;
263 private final String loginContextName;
264 private final boolean useTicketCache;
265 private final String keytabFile;
266 private final String principal;
267
268 public JaasConfiguration(String loginContextName, String principal) {
269 this(loginContextName, principal, null, true);
270 }
271
272 public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
273 this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
274 }
275
276 private JaasConfiguration(String loginContextName, String principal,
277 String keytabFile, boolean useTicketCache) {
278 try {
279 this.baseConfig = javax.security.auth.login.Configuration.getConfiguration();
280 } catch (SecurityException e) {
281 this.baseConfig = null;
282 }
283 this.loginContextName = loginContextName;
284 this.useTicketCache = useTicketCache;
285 this.keytabFile = keytabFile;
286 this.principal = principal;
287 LOG.info("JaasConfiguration loginContextName=" + loginContextName +
288 " principal=" + principal + " useTicketCache=" + useTicketCache +
289 " keytabFile=" + keytabFile);
290 }
291
292 @Override
293 public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
294 if (loginContextName.equals(appName)) {
295 if (!useTicketCache) {
296 KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
297 KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
298 }
299 KEYTAB_KERBEROS_OPTIONS.put("principal", principal);
300 KEYTAB_KERBEROS_OPTIONS.put("useTicketCache", useTicketCache ? "true" : "false");
301 return KEYTAB_KERBEROS_CONF;
302 }
303 if (baseConfig != null) return baseConfig.getAppConfigurationEntry(appName);
304 return(null);
305 }
306 }
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322 public static String joinZNode(String prefix, String suffix) {
323 return prefix + ZNODE_PATH_SEPARATOR + suffix;
324 }
325
326
327
328
329
330
331 public static String getParent(String node) {
332 int idx = node.lastIndexOf(ZNODE_PATH_SEPARATOR);
333 return idx <= 0 ? null : node.substring(0, idx);
334 }
335
336
337
338
339
340
341 public static String getNodeName(String path) {
342 return path.substring(path.lastIndexOf("/")+1);
343 }
344
345
346
347
348
349
350
351 public static String getZooKeeperClusterKey(Configuration conf) {
352 return getZooKeeperClusterKey(conf, null);
353 }
354
355
356
357
358
359
360
361
362 public static String getZooKeeperClusterKey(Configuration conf, String name) {
363 String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
364 "[\\t\\n\\x0B\\f\\r]", ""));
365 StringBuilder builder = new StringBuilder(ensemble);
366 builder.append(":");
367 builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
368 builder.append(":");
369 builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
370 if (name != null && !name.isEmpty()) {
371 builder.append(",");
372 builder.append(name);
373 }
374 return builder.toString();
375 }
376
377
378
379
380
381
382
383
384 public static void applyClusterKeyToConf(Configuration conf, String key)
385 throws IOException{
386 String[] parts = transformClusterKey(key);
387 conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
388 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
389 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
390 }
391
392
393
394
395
396
397
398
399
400 public static String[] transformClusterKey(String key) throws IOException {
401 String[] parts = key.split(":");
402 if (parts.length != 3) {
403 throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
404 HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:"
405 + HConstants.ZOOKEEPER_ZNODE_PARENT);
406 }
407 return parts;
408 }
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424 public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
425 throws KeeperException {
426 try {
427 Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
428 boolean exists = s != null ? true : false;
429 if (exists) {
430 LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode));
431 } else {
432 LOG.debug(zkw.prefix("Set watcher on znode that does not yet exist, " + znode));
433 }
434 return exists;
435 } catch (KeeperException e) {
436 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
437 zkw.keeperException(e);
438 return false;
439 } catch (InterruptedException e) {
440 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
441 zkw.interruptedException(e);
442 return false;
443 }
444 }
445
446
447
448
449
450
451
452
453
454
455
456 public static boolean setWatchIfNodeExists(ZooKeeperWatcher zkw, String znode)
457 throws KeeperException {
458 try {
459 zkw.getRecoverableZooKeeper().getData(znode, true, null);
460 return true;
461 } catch (NoNodeException e) {
462 return false;
463 } catch (InterruptedException e) {
464 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
465 zkw.interruptedException(e);
466 return false;
467 }
468 }
469
470
471
472
473
474
475
476
477
478 public static int checkExists(ZooKeeperWatcher zkw, String znode)
479 throws KeeperException {
480 try {
481 Stat s = zkw.getRecoverableZooKeeper().exists(znode, null);
482 return s != null ? s.getVersion() : -1;
483 } catch (KeeperException e) {
484 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
485 zkw.keeperException(e);
486 return -1;
487 } catch (InterruptedException e) {
488 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
489 zkw.interruptedException(e);
490 return -1;
491 }
492 }
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514 public static List<String> listChildrenAndWatchForNewChildren(
515 ZooKeeperWatcher zkw, String znode)
516 throws KeeperException {
517 try {
518 List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
519 return children;
520 } catch(KeeperException.NoNodeException ke) {
521 LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
522 "because node does not exist (not an error)"));
523 return null;
524 } catch (KeeperException e) {
525 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e);
526 zkw.keeperException(e);
527 return null;
528 } catch (InterruptedException e) {
529 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e);
530 zkw.interruptedException(e);
531 return null;
532 }
533 }
534
535
536
537
538
539
540
541
542
543
544 public static List<String> listChildrenAndWatchThem(ZooKeeperWatcher zkw,
545 String znode) throws KeeperException {
546 List<String> children = listChildrenAndWatchForNewChildren(zkw, znode);
547 if (children == null) {
548 return null;
549 }
550 for (String child : children) {
551 watchAndCheckExists(zkw, joinZNode(znode, child));
552 }
553 return children;
554 }
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570 public static List<String> listChildrenNoWatch(ZooKeeperWatcher zkw, String znode)
571 throws KeeperException {
572 List<String> children = null;
573 try {
574
575 children = zkw.getRecoverableZooKeeper().getChildren(znode, null);
576 } catch(KeeperException.NoNodeException nne) {
577 return null;
578 } catch(InterruptedException ie) {
579 zkw.interruptedException(ie);
580 }
581 return children;
582 }
583
584
585
586
587
588 @Deprecated
589 public static class NodeAndData {
590 private String node;
591 private byte [] data;
592 public NodeAndData(String node, byte [] data) {
593 this.node = node;
594 this.data = data;
595 }
596 public String getNode() {
597 return node;
598 }
599 public byte [] getData() {
600 return data;
601 }
602 @Override
603 public String toString() {
604 return node;
605 }
606 public boolean isEmpty() {
607 return (data.length == 0);
608 }
609 }
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627 public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
628 throws KeeperException {
629 try {
630 return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty();
631 } catch(KeeperException.NoNodeException ke) {
632 LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
633 "because node does not exist (not an error)"));
634 return false;
635 } catch (KeeperException e) {
636 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e);
637 zkw.keeperException(e);
638 return false;
639 } catch (InterruptedException e) {
640 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e);
641 zkw.interruptedException(e);
642 return false;
643 }
644 }
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659 public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
660 throws KeeperException {
661 try {
662 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null);
663 return stat == null ? 0 : stat.getNumChildren();
664 } catch(KeeperException e) {
665 LOG.warn(zkw.prefix("Unable to get children of node " + znode));
666 zkw.keeperException(e);
667 } catch(InterruptedException e) {
668 zkw.interruptedException(e);
669 }
670 return 0;
671 }
672
673
674
675
676
677
678
679
680
681
682 public static byte [] getData(ZooKeeperWatcher zkw, String znode)
683 throws KeeperException {
684 try {
685 byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null);
686 logRetrievedMsg(zkw, znode, data, false);
687 return data;
688 } catch (KeeperException.NoNodeException e) {
689 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " +
690 "because node does not exist (not an error)"));
691 return null;
692 } catch (KeeperException e) {
693 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
694 zkw.keeperException(e);
695 return null;
696 } catch (InterruptedException e) {
697 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
698 zkw.interruptedException(e);
699 return null;
700 }
701 }
702
703
704
705
706
707
708
709
710
711
712
713
714 public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
715 throws KeeperException {
716 return getDataInternal(zkw, znode, null, true);
717 }
718
719
720
721
722
723
724
725
726
727
728
729
730
731 public static byte[] getDataAndWatch(ZooKeeperWatcher zkw, String znode,
732 Stat stat) throws KeeperException {
733 return getDataInternal(zkw, znode, stat, true);
734 }
735
736 private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
737 boolean watcherSet)
738 throws KeeperException {
739 try {
740 byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, stat);
741 logRetrievedMsg(zkw, znode, data, watcherSet);
742 return data;
743 } catch (KeeperException.NoNodeException e) {
744
745
746 LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " +
747 "because node does not exist (not an error)"));
748 return null;
749 } catch (KeeperException e) {
750 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
751 zkw.keeperException(e);
752 return null;
753 } catch (InterruptedException e) {
754 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
755 zkw.interruptedException(e);
756 return null;
757 }
758 }
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775 public static byte [] getDataNoWatch(ZooKeeperWatcher zkw, String znode,
776 Stat stat)
777 throws KeeperException {
778 try {
779 byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat);
780 logRetrievedMsg(zkw, znode, data, false);
781 return data;
782 } catch (KeeperException.NoNodeException e) {
783 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " +
784 "because node does not exist (not necessarily an error)"));
785 return null;
786 } catch (KeeperException e) {
787 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
788 zkw.keeperException(e);
789 return null;
790 } catch (InterruptedException e) {
791 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
792 zkw.interruptedException(e);
793 return null;
794 }
795 }
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814 public static List<NodeAndData> getChildDataAndWatchForNewChildren(
815 ZooKeeperWatcher zkw, String baseNode) throws KeeperException {
816 List<String> nodes =
817 ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode);
818 List<NodeAndData> newNodes = new ArrayList<NodeAndData>();
819 if (nodes != null) {
820 for (String node : nodes) {
821 String nodePath = ZKUtil.joinZNode(baseNode, node);
822 byte[] data = ZKUtil.getDataAndWatch(zkw, nodePath);
823 newNodes.add(new NodeAndData(nodePath, data));
824 }
825 }
826 return newNodes;
827 }
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845 public static void updateExistingNodeData(ZooKeeperWatcher zkw, String znode,
846 byte [] data, int expectedVersion)
847 throws KeeperException {
848 try {
849 zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion);
850 } catch(InterruptedException ie) {
851 zkw.interruptedException(ie);
852 }
853 }
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879 public static boolean setData(ZooKeeperWatcher zkw, String znode,
880 byte [] data, int expectedVersion)
881 throws KeeperException, KeeperException.NoNodeException {
882 try {
883 return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null;
884 } catch (InterruptedException e) {
885 zkw.interruptedException(e);
886 return false;
887 }
888 }
889
890
891
892
893
894
895
896
897
898
899 public static void createSetData(final ZooKeeperWatcher zkw, final String znode,
900 final byte [] data)
901 throws KeeperException {
902 if (checkExists(zkw, znode) == -1) {
903 ZKUtil.createWithParents(zkw, znode, data);
904 } else {
905 ZKUtil.setData(zkw, znode, data);
906 }
907 }
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925 public static void setData(ZooKeeperWatcher zkw, String znode, byte [] data)
926 throws KeeperException, KeeperException.NoNodeException {
927 setData(zkw, (SetData)ZKUtilOp.setData(znode, data));
928 }
929
930 private static void setData(ZooKeeperWatcher zkw, SetData setData)
931 throws KeeperException, KeeperException.NoNodeException {
932 SetDataRequest sd = (SetDataRequest)toZooKeeperOp(zkw, setData).toRequestRecord();
933 setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
934 }
935
936
937
938
939
940
941 public static boolean isSecureZooKeeper(Configuration conf) {
942
943
944 try {
945 javax.security.auth.login.Configuration testConfig = javax.security.auth.login.Configuration.getConfiguration();
946 if(testConfig.getAppConfigurationEntry("Client") == null) {
947 return false;
948 }
949 } catch(Exception e) {
950
951 return false;
952 }
953
954
955 return("kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication")) &&
956 conf.get("hbase.zookeeper.client.keytab.file") != null);
957 }
958
959 private static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node) {
960 if (!node.startsWith(zkw.baseZNode)) {
961 return Ids.OPEN_ACL_UNSAFE;
962 }
963 if (isSecureZooKeeper(zkw.getConfiguration())) {
964 String superUser = zkw.getConfiguration().get("hbase.superuser");
965 ArrayList<ACL> acls = new ArrayList<ACL>();
966
967 if (superUser != null) {
968 acls.add(new ACL(Perms.ALL, new Id("auth", superUser)));
969 }
970
971
972 if ((node.equals(zkw.baseZNode) == true) ||
973 (node.equals(zkw.metaServerZNode) == true) ||
974 (node.equals(zkw.getMasterAddressZNode()) == true) ||
975 (node.equals(zkw.clusterIdZNode) == true) ||
976 (node.equals(zkw.rsZNode) == true) ||
977 (node.equals(zkw.backupMasterAddressesZNode) == true) ||
978 (node.startsWith(zkw.assignmentZNode) == true) ||
979 (node.startsWith(zkw.tableZNode) == true)) {
980 acls.addAll(Ids.CREATOR_ALL_ACL);
981 acls.addAll(Ids.READ_ACL_UNSAFE);
982 } else {
983 acls.addAll(Ids.CREATOR_ALL_ACL);
984 }
985 return acls;
986 } else {
987 return Ids.OPEN_ACL_UNSAFE;
988 }
989 }
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013 public static boolean createEphemeralNodeAndWatch(ZooKeeperWatcher zkw,
1014 String znode, byte [] data)
1015 throws KeeperException {
1016 boolean ret = true;
1017 try {
1018 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1019 CreateMode.EPHEMERAL);
1020 } catch (KeeperException.NodeExistsException nee) {
1021 ret = false;
1022 } catch (InterruptedException e) {
1023 LOG.info("Interrupted", e);
1024 Thread.currentThread().interrupt();
1025 }
1026 if(!watchAndCheckExists(zkw, znode)) {
1027
1028 return createEphemeralNodeAndWatch(zkw, znode, data);
1029 }
1030 return ret;
1031 }
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053 public static boolean createNodeIfNotExistsAndWatch(
1054 ZooKeeperWatcher zkw, String znode, byte [] data)
1055 throws KeeperException {
1056 boolean ret = true;
1057 try {
1058 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1059 CreateMode.PERSISTENT);
1060 } catch (KeeperException.NodeExistsException nee) {
1061 ret = false;
1062 } catch (InterruptedException e) {
1063 zkw.interruptedException(e);
1064 return false;
1065 }
1066 try {
1067 zkw.getRecoverableZooKeeper().exists(znode, zkw);
1068 } catch (InterruptedException e) {
1069 zkw.interruptedException(e);
1070 return false;
1071 }
1072 return ret;
1073 }
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089 public static String createNodeIfNotExistsNoWatch(ZooKeeperWatcher zkw, String znode,
1090 byte[] data, CreateMode createMode) throws KeeperException {
1091
1092 String createdZNode = null;
1093 try {
1094 createdZNode = zkw.getRecoverableZooKeeper().create(znode, data,
1095 createACL(zkw, znode), createMode);
1096 } catch (KeeperException.NodeExistsException nee) {
1097 return znode;
1098 } catch (InterruptedException e) {
1099 zkw.interruptedException(e);
1100 return null;
1101 }
1102 return createdZNode;
1103 }
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121 public static int createAndWatch(ZooKeeperWatcher zkw,
1122 String znode, byte [] data)
1123 throws KeeperException, KeeperException.NodeExistsException {
1124 try {
1125 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1126 CreateMode.PERSISTENT);
1127 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw);
1128 if (stat == null){
1129
1130 throw KeeperException.create(KeeperException.Code.SYSTEMERROR,
1131 "ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode);
1132 }
1133 return stat.getVersion();
1134 } catch (InterruptedException e) {
1135 zkw.interruptedException(e);
1136 return -1;
1137 }
1138 }
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155 public static void asyncCreate(ZooKeeperWatcher zkw,
1156 String znode, byte [] data, final AsyncCallback.StringCallback cb,
1157 final Object ctx) {
1158 zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data,
1159 createACL(zkw, znode), CreateMode.PERSISTENT, cb, ctx);
1160 }
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172 public static void createAndFailSilent(ZooKeeperWatcher zkw,
1173 String znode) throws KeeperException {
1174 createAndFailSilent(zkw, znode, new byte[0]);
1175 }
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188 public static void createAndFailSilent(ZooKeeperWatcher zkw,
1189 String znode, byte[] data)
1190 throws KeeperException {
1191 createAndFailSilent(zkw,
1192 (CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, data));
1193 }
1194
1195 private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
1196 throws KeeperException {
1197 CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
1198 String znode = create.getPath();
1199 try {
1200 RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
1201 if (zk.exists(znode, false) == null) {
1202 zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
1203 }
1204 } catch(KeeperException.NodeExistsException nee) {
1205 } catch(KeeperException.NoAuthException nee){
1206 try {
1207 if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
1208
1209 throw(nee);
1210 }
1211 } catch (InterruptedException ie) {
1212 zkw.interruptedException(ie);
1213 }
1214
1215 } catch(InterruptedException ie) {
1216 zkw.interruptedException(ie);
1217 }
1218 }
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231 public static void createWithParents(ZooKeeperWatcher zkw, String znode)
1232 throws KeeperException {
1233 createWithParents(zkw, znode, new byte[0]);
1234 }
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249 public static void createWithParents(ZooKeeperWatcher zkw, String znode, byte[] data)
1250 throws KeeperException {
1251 try {
1252 if(znode == null) {
1253 return;
1254 }
1255 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1256 CreateMode.PERSISTENT);
1257 } catch(KeeperException.NodeExistsException nee) {
1258 return;
1259 } catch(KeeperException.NoNodeException nne) {
1260 createWithParents(zkw, getParent(znode));
1261 createWithParents(zkw, znode, data);
1262 } catch(InterruptedException ie) {
1263 zkw.interruptedException(ie);
1264 }
1265 }
1266
1267
1268
1269
1270
1271
1272
1273
1274 public static void deleteNode(ZooKeeperWatcher zkw, String node)
1275 throws KeeperException {
1276 deleteNode(zkw, node, -1);
1277 }
1278
1279
1280
1281
1282
1283 public static boolean deleteNode(ZooKeeperWatcher zkw, String node,
1284 int version)
1285 throws KeeperException {
1286 try {
1287 zkw.getRecoverableZooKeeper().delete(node, version);
1288 return true;
1289 } catch(KeeperException.BadVersionException bve) {
1290 return false;
1291 } catch(InterruptedException ie) {
1292 zkw.interruptedException(ie);
1293 return false;
1294 }
1295 }
1296
1297
1298
1299
1300
1301
1302
1303 public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
1304 throws KeeperException {
1305 deleteNodeFailSilent(zkw,
1306 (DeleteNodeFailSilent)ZKUtilOp.deleteNodeFailSilent(node));
1307 }
1308
1309 private static void deleteNodeFailSilent(ZooKeeperWatcher zkw,
1310 DeleteNodeFailSilent dnfs) throws KeeperException {
1311 DeleteRequest delete = (DeleteRequest)toZooKeeperOp(zkw, dnfs).toRequestRecord();
1312 try {
1313 zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion());
1314 } catch(KeeperException.NoNodeException nne) {
1315 } catch(InterruptedException ie) {
1316 zkw.interruptedException(ie);
1317 }
1318 }
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329 public static void deleteNodeRecursively(ZooKeeperWatcher zkw, String node)
1330 throws KeeperException {
1331 try {
1332 List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
1333
1334 if (children == null) return;
1335
1336 if(!children.isEmpty()) {
1337 for(String child : children) {
1338 deleteNodeRecursively(zkw, joinZNode(node, child));
1339 }
1340 }
1341
1342
1343 if (zkw.getRecoverableZooKeeper().exists(node, zkw) != null){
1344 zkw.getRecoverableZooKeeper().delete(node, -1);
1345 }
1346 } catch(InterruptedException ie) {
1347 zkw.interruptedException(ie);
1348 }
1349 }
1350
1351
1352
1353
1354
1355
1356
1357 public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
1358 throws KeeperException {
1359 List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
1360 if (children == null || children.isEmpty()) return;
1361 for(String child : children) {
1362 deleteNodeRecursively(zkw, joinZNode(node, child));
1363 }
1364 }
1365
1366
1367
1368
1369
1370
1371 public abstract static class ZKUtilOp {
1372 private String path;
1373
1374 private ZKUtilOp(String path) {
1375 this.path = path;
1376 }
1377
1378
1379
1380
1381 public static ZKUtilOp createAndFailSilent(String path, byte[] data) {
1382 return new CreateAndFailSilent(path, data);
1383 }
1384
1385
1386
1387
1388 public static ZKUtilOp deleteNodeFailSilent(String path) {
1389 return new DeleteNodeFailSilent(path);
1390 }
1391
1392
1393
1394
1395 public static ZKUtilOp setData(String path, byte [] data) {
1396 return new SetData(path, data);
1397 }
1398
1399
1400
1401
1402 public String getPath() {
1403 return path;
1404 }
1405
1406
1407
1408
1409
1410 public static class CreateAndFailSilent extends ZKUtilOp {
1411 private byte [] data;
1412
1413 private CreateAndFailSilent(String path, byte [] data) {
1414 super(path);
1415 this.data = data;
1416 }
1417
1418 public byte[] getData() {
1419 return data;
1420 }
1421
1422 @Override
1423 public boolean equals(Object o) {
1424 if (this == o) return true;
1425 if (!(o instanceof CreateAndFailSilent)) return false;
1426
1427 CreateAndFailSilent op = (CreateAndFailSilent) o;
1428 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
1429 }
1430
1431 @Override
1432 public int hashCode() {
1433 int ret = 17 + getPath().hashCode() * 31;
1434 return ret * 31 + Bytes.hashCode(data);
1435 }
1436 }
1437
1438
1439
1440
1441
1442 public static class DeleteNodeFailSilent extends ZKUtilOp {
1443 private DeleteNodeFailSilent(String path) {
1444 super(path);
1445 }
1446
1447 @Override
1448 public boolean equals(Object o) {
1449 if (this == o) return true;
1450 if (!(o instanceof DeleteNodeFailSilent)) return false;
1451
1452 return super.equals(o);
1453 }
1454
1455 @Override
1456 public int hashCode() {
1457 return getPath().hashCode();
1458 }
1459 }
1460
1461
1462
1463
1464 public static class SetData extends ZKUtilOp {
1465 private byte [] data;
1466
1467 private SetData(String path, byte [] data) {
1468 super(path);
1469 this.data = data;
1470 }
1471
1472 public byte[] getData() {
1473 return data;
1474 }
1475
1476 @Override
1477 public boolean equals(Object o) {
1478 if (this == o) return true;
1479 if (!(o instanceof SetData)) return false;
1480
1481 SetData op = (SetData) o;
1482 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
1483 }
1484
1485 @Override
1486 public int hashCode() {
1487 int ret = getPath().hashCode();
1488 return ret * 31 + Bytes.hashCode(data);
1489 }
1490 }
1491 }
1492
1493
1494
1495
1496 private static Op toZooKeeperOp(ZooKeeperWatcher zkw, ZKUtilOp op)
1497 throws UnsupportedOperationException {
1498 if(op == null) return null;
1499
1500 if (op instanceof CreateAndFailSilent) {
1501 CreateAndFailSilent cafs = (CreateAndFailSilent)op;
1502 return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
1503 CreateMode.PERSISTENT);
1504 } else if (op instanceof DeleteNodeFailSilent) {
1505 DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
1506 return Op.delete(dnfs.getPath(), -1);
1507 } else if (op instanceof SetData) {
1508 SetData sd = (SetData)op;
1509 return Op.setData(sd.getPath(), sd.getData(), -1);
1510 } else {
1511 throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
1512 + op.getClass().getName());
1513 }
1514 }
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537 public static void multiOrSequential(ZooKeeperWatcher zkw, List<ZKUtilOp> ops,
1538 boolean runSequentialOnMultiFailure) throws KeeperException {
1539 if (ops == null) return;
1540 boolean useMulti = zkw.getConfiguration().getBoolean(HConstants.ZOOKEEPER_USEMULTI, false);
1541
1542 if (useMulti) {
1543 List<Op> zkOps = new LinkedList<Op>();
1544 for (ZKUtilOp op : ops) {
1545 zkOps.add(toZooKeeperOp(zkw, op));
1546 }
1547 try {
1548 zkw.getRecoverableZooKeeper().multi(zkOps);
1549 } catch (KeeperException ke) {
1550 switch (ke.code()) {
1551 case NODEEXISTS:
1552 case NONODE:
1553 case BADVERSION:
1554 case NOAUTH:
1555
1556
1557 if (runSequentialOnMultiFailure) {
1558 LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "."
1559 + " Attempting to run operations sequentially because"
1560 + " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + ".");
1561 processSequentially(zkw, ops);
1562 break;
1563 }
1564 default:
1565 throw ke;
1566 }
1567 } catch (InterruptedException ie) {
1568 zkw.interruptedException(ie);
1569 }
1570 } else {
1571
1572 processSequentially(zkw, ops);
1573 }
1574
1575 }
1576
1577 private static void processSequentially(ZooKeeperWatcher zkw, List<ZKUtilOp> ops)
1578 throws KeeperException, NoNodeException {
1579 for (ZKUtilOp op : ops) {
1580 if (op instanceof CreateAndFailSilent) {
1581 createAndFailSilent(zkw, (CreateAndFailSilent) op);
1582 } else if (op instanceof DeleteNodeFailSilent) {
1583 deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) op);
1584 } else if (op instanceof SetData) {
1585 setData(zkw, (SetData) op);
1586 } else {
1587 throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
1588 + op.getClass().getName());
1589 }
1590 }
1591 }
1592
1593
1594
1595
1596
1597
1598 public static String dump(ZooKeeperWatcher zkw) {
1599 StringBuilder sb = new StringBuilder();
1600 try {
1601 sb.append("HBase is rooted at ").append(zkw.baseZNode);
1602 sb.append("\nActive master address: ");
1603 try {
1604 sb.append(MasterAddressTracker.getMasterAddress(zkw));
1605 } catch (IOException e) {
1606 sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>");
1607 }
1608 sb.append("\nBackup master addresses:");
1609 for (String child : listChildrenNoWatch(zkw,
1610 zkw.backupMasterAddressesZNode)) {
1611 sb.append("\n ").append(child);
1612 }
1613 sb.append("\nRegion server holding hbase:meta: " + MetaRegionTracker.getMetaRegionLocation(zkw));
1614 sb.append("\nRegion servers:");
1615 for (String child : listChildrenNoWatch(zkw, zkw.rsZNode)) {
1616 sb.append("\n ").append(child);
1617 }
1618 try {
1619 getReplicationZnodesDump(zkw, sb);
1620 } catch (KeeperException ke) {
1621 LOG.warn("Couldn't get the replication znode dump", ke);
1622 }
1623 sb.append("\nQuorum Server Statistics:");
1624 String[] servers = zkw.getQuorum().split(",");
1625 for (String server : servers) {
1626 sb.append("\n ").append(server);
1627 try {
1628 String[] stat = getServerStats(server, ZKUtil.zkDumpConnectionTimeOut);
1629
1630 if (stat == null) {
1631 sb.append("[Error] invalid quorum server: " + server);
1632 break;
1633 }
1634
1635 for (String s : stat) {
1636 sb.append("\n ").append(s);
1637 }
1638 } catch (Exception e) {
1639 sb.append("\n ERROR: ").append(e.getMessage());
1640 }
1641 }
1642 } catch (KeeperException ke) {
1643 sb.append("\nFATAL ZooKeeper Exception!\n");
1644 sb.append("\n" + ke.getMessage());
1645 }
1646 return sb.toString();
1647 }
1648
1649
1650
1651
1652
1653
1654
1655 private static void getReplicationZnodesDump(ZooKeeperWatcher zkw, StringBuilder sb)
1656 throws KeeperException {
1657 String replicationZNodeName = zkw.getConfiguration().get("zookeeper.znode.replication",
1658 "replication");
1659 String replicationZnode = joinZNode(zkw.baseZNode, replicationZNodeName);
1660 if (ZKUtil.checkExists(zkw, replicationZnode) == -1) return;
1661
1662 sb.append("\n").append(replicationZnode).append(": ");
1663 List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
1664 for (String child : children) {
1665 String znode = joinZNode(replicationZnode, child);
1666 if (child.equals(zkw.getConfiguration().get("zookeeper.znode.replication.peers", "peers"))) {
1667 appendPeersZnodes(zkw, znode, sb);
1668 } else if (child.equals(zkw.getConfiguration().
1669 get("zookeeper.znode.replication.rs", "rs"))) {
1670 appendRSZnodes(zkw, znode, sb);
1671 }
1672 }
1673 }
1674
1675 private static void appendRSZnodes(ZooKeeperWatcher zkw, String znode, StringBuilder sb)
1676 throws KeeperException {
1677 List<String> stack = new LinkedList<String>();
1678 stack.add(znode);
1679 do {
1680 String znodeToProcess = stack.remove(stack.size() - 1);
1681 sb.append("\n").append(znodeToProcess).append(": ");
1682 byte[] data = ZKUtil.getData(zkw, znodeToProcess);
1683 if (data != null && data.length > 0) {
1684 long position = 0;
1685 try {
1686 position = ZKUtil.parseHLogPositionFrom(ZKUtil.getData(zkw, znodeToProcess));
1687 sb.append(position);
1688 } catch (Exception e) {
1689 }
1690 }
1691 for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
1692 stack.add(ZKUtil.joinZNode(znodeToProcess, zNodeChild));
1693 }
1694 } while (stack.size() > 0);
1695 }
1696
1697 private static void appendPeersZnodes(ZooKeeperWatcher zkw, String peersZnode,
1698 StringBuilder sb) throws KeeperException {
1699 int pblen = ProtobufUtil.lengthOfPBMagic();
1700 sb.append("\n").append(peersZnode).append(": ");
1701 for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, peersZnode)) {
1702 String znodeToProcess = ZKUtil.joinZNode(peersZnode, peerIdZnode);
1703 byte[] data = ZKUtil.getData(zkw, znodeToProcess);
1704
1705 try {
1706 String clusterKey = ZooKeeperProtos.ReplicationPeer.newBuilder().
1707 mergeFrom(data, pblen, data.length - pblen).getClusterkey();
1708 sb.append("\n").append(znodeToProcess).append(": ").append(clusterKey);
1709
1710 appendPeerState(zkw, znodeToProcess, sb);
1711 } catch (InvalidProtocolBufferException ipbe) {
1712 LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe);
1713 }
1714 }
1715 }
1716
1717 private static void appendPeerState(ZooKeeperWatcher zkw, String znodeToProcess,
1718 StringBuilder sb) throws KeeperException, InvalidProtocolBufferException {
1719 String peerState = zkw.getConfiguration().get("zookeeper.znode.replication.peers.state",
1720 "peer-state");
1721 int pblen = ProtobufUtil.lengthOfPBMagic();
1722 for (String child : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
1723 if (!child.equals(peerState)) continue;
1724 String peerStateZnode = ZKUtil.joinZNode(znodeToProcess, child);
1725 sb.append("\n").append(peerStateZnode).append(": ");
1726 byte[] peerStateData = ZKUtil.getData(zkw, peerStateZnode);
1727 sb.append(ZooKeeperProtos.ReplicationState.newBuilder()
1728 .mergeFrom(peerStateData, pblen, peerStateData.length - pblen).getState().name());
1729 }
1730 }
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740 public static String[] getServerStats(String server, int timeout)
1741 throws IOException {
1742 String[] sp = server.split(":");
1743 if (sp == null || sp.length == 0) {
1744 return null;
1745 }
1746
1747 String host = sp[0];
1748 int port = sp.length > 1 ? Integer.parseInt(sp[1])
1749 : HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
1750
1751 Socket socket = new Socket();
1752 InetSocketAddress sockAddr = new InetSocketAddress(host, port);
1753 socket.connect(sockAddr, timeout);
1754
1755 socket.setSoTimeout(timeout);
1756 PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
1757 BufferedReader in = new BufferedReader(new InputStreamReader(
1758 socket.getInputStream()));
1759 out.println("stat");
1760 out.flush();
1761 ArrayList<String> res = new ArrayList<String>();
1762 while (true) {
1763 String line = in.readLine();
1764 if (line != null) {
1765 res.add(line);
1766 } else {
1767 break;
1768 }
1769 }
1770 socket.close();
1771 return res.toArray(new String[res.size()]);
1772 }
1773
1774 private static void logRetrievedMsg(final ZooKeeperWatcher zkw,
1775 final String znode, final byte [] data, final boolean watcherSet) {
1776 if (!LOG.isTraceEnabled()) return;
1777 LOG.trace(zkw.prefix("Retrieved " + ((data == null)? 0: data.length) +
1778 " byte(s) of data from znode " + znode +
1779 (watcherSet? " and set watcher; ": "; data=") +
1780 (data == null? "null": data.length == 0? "empty": (
1781 znode.startsWith(zkw.assignmentZNode)?
1782 ZKAssign.toString(data):
1783 znode.startsWith(zkw.metaServerZNode)?
1784 getServerNameOrEmptyString(data):
1785 znode.startsWith(zkw.backupMasterAddressesZNode)?
1786 getServerNameOrEmptyString(data):
1787 StringUtils.abbreviate(Bytes.toStringBinary(data), 32)))));
1788 }
1789
1790 private static String getServerNameOrEmptyString(final byte [] data) {
1791 try {
1792 return ServerName.parseFrom(data).toString();
1793 } catch (DeserializationException e) {
1794 return "";
1795 }
1796 }
1797
1798
1799
1800
1801
1802 public static void waitForBaseZNode(Configuration conf) throws IOException {
1803 LOG.info("Waiting until the base znode is available");
1804 String parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
1805 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1806 ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf),
1807 conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1808 HConstants.DEFAULT_ZK_SESSION_TIMEOUT), EmptyWatcher.instance);
1809
1810 final int maxTimeMs = 10000;
1811 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
1812
1813 KeeperException keeperEx = null;
1814 try {
1815 try {
1816 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
1817 try {
1818 if (zk.exists(parentZNode, false) != null) {
1819 LOG.info("Parent znode exists: " + parentZNode);
1820 keeperEx = null;
1821 break;
1822 }
1823 } catch (KeeperException e) {
1824 keeperEx = e;
1825 }
1826 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
1827 }
1828 } finally {
1829 zk.close();
1830 }
1831 } catch (InterruptedException ex) {
1832 Thread.currentThread().interrupt();
1833 }
1834
1835 if (keeperEx != null) {
1836 throw new IOException(keeperEx);
1837 }
1838 }
1839
1840
1841 public static byte[] blockUntilAvailable(
1842 final ZooKeeperWatcher zkw, final String znode, final long timeout)
1843 throws InterruptedException {
1844 if (timeout < 0) throw new IllegalArgumentException();
1845 if (zkw == null) throw new IllegalArgumentException();
1846 if (znode == null) throw new IllegalArgumentException();
1847
1848 byte[] data = null;
1849 boolean finished = false;
1850 final long endTime = System.currentTimeMillis() + timeout;
1851 while (!finished) {
1852 try {
1853 data = ZKUtil.getData(zkw, znode);
1854 } catch(KeeperException e) {
1855 if (e instanceof KeeperException.SessionExpiredException
1856 || e instanceof KeeperException.AuthFailedException) {
1857
1858 throw new InterruptedException("interrupted due to " + e);
1859 }
1860 LOG.warn("Unexpected exception handling blockUntilAvailable", e);
1861 }
1862
1863 if (data == null && (System.currentTimeMillis() +
1864 HConstants.SOCKET_RETRY_WAIT_MS < endTime)) {
1865 Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
1866 } else {
1867 finished = true;
1868 }
1869 }
1870
1871 return data;
1872 }
1873
1874
1875
1876
1877
1878
1879
1880
1881 public static KeeperException convert(final DeserializationException e) {
1882 KeeperException ke = new KeeperException.DataInconsistencyException();
1883 ke.initCause(e);
1884 return ke;
1885 }
1886
1887
1888
1889
1890
1891
1892 public static void logZKTree(ZooKeeperWatcher zkw, String root) {
1893 if (!LOG.isDebugEnabled()) return;
1894 LOG.debug("Current zk system:");
1895 String prefix = "|-";
1896 LOG.debug(prefix + root);
1897 try {
1898 logZKTree(zkw, root, prefix);
1899 } catch (KeeperException e) {
1900 throw new RuntimeException(e);
1901 }
1902 }
1903
1904
1905
1906
1907
1908
1909 protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException {
1910 List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
1911 if (children == null) return;
1912 for (String child : children) {
1913 LOG.debug(prefix + child);
1914 String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
1915 logZKTree(zkw, node, prefix + "---");
1916 }
1917 }
1918
1919
1920
1921
1922
1923
1924 public static byte[] positionToByteArray(final long position) {
1925 byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
1926 .build().toByteArray();
1927 return ProtobufUtil.prependPBMagic(bytes);
1928 }
1929
1930
1931
1932
1933
1934
1935 public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
1936 if (bytes == null) {
1937 throw new DeserializationException("Unable to parse null HLog position.");
1938 }
1939 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
1940 int pblen = ProtobufUtil.lengthOfPBMagic();
1941 ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
1942 ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
1943 ZooKeeperProtos.ReplicationHLogPosition position;
1944 try {
1945 position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
1946 } catch (InvalidProtocolBufferException e) {
1947 throw new DeserializationException(e);
1948 }
1949 return position.getPosition();
1950 } else {
1951 if (bytes.length > 0) {
1952 return Bytes.toLong(bytes);
1953 }
1954 return 0;
1955 }
1956 }
1957
1958
1959
1960
1961
1962
1963
1964
1965 public static byte[] regionSequenceIdsToByteArray(final Long regionLastFlushedSequenceId,
1966 final Map<byte[], Long> storeSequenceIds) {
1967 ZooKeeperProtos.RegionStoreSequenceIds.Builder regionSequenceIdsBuilder =
1968 ZooKeeperProtos.RegionStoreSequenceIds.newBuilder();
1969 ZooKeeperProtos.StoreSequenceId.Builder storeSequenceIdBuilder =
1970 ZooKeeperProtos.StoreSequenceId.newBuilder();
1971 if (storeSequenceIds != null) {
1972 for (Map.Entry<byte[], Long> e : storeSequenceIds.entrySet()){
1973 byte[] columnFamilyName = e.getKey();
1974 Long curSeqId = e.getValue();
1975 storeSequenceIdBuilder.setFamilyName(ByteStringer.wrap(columnFamilyName));
1976 storeSequenceIdBuilder.setSequenceId(curSeqId);
1977 regionSequenceIdsBuilder.addStoreSequenceId(storeSequenceIdBuilder.build());
1978 storeSequenceIdBuilder.clear();
1979 }
1980 }
1981 regionSequenceIdsBuilder.setLastFlushedSequenceId(regionLastFlushedSequenceId);
1982 byte[] result = regionSequenceIdsBuilder.build().toByteArray();
1983 return ProtobufUtil.prependPBMagic(result);
1984 }
1985
1986
1987
1988
1989
1990
1991 public static RegionStoreSequenceIds parseRegionStoreSequenceIds(final byte[] bytes)
1992 throws DeserializationException {
1993 if (bytes == null || !ProtobufUtil.isPBMagicPrefix(bytes)) {
1994 throw new DeserializationException("Unable to parse RegionStoreSequenceIds.");
1995 }
1996 RegionStoreSequenceIds.Builder regionSequenceIdsBuilder =
1997 ZooKeeperProtos.RegionStoreSequenceIds.newBuilder();
1998 int pblen = ProtobufUtil.lengthOfPBMagic();
1999 RegionStoreSequenceIds storeIds = null;
2000 try {
2001 storeIds = regionSequenceIdsBuilder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
2002 } catch (InvalidProtocolBufferException e) {
2003 throw new DeserializationException(e);
2004 }
2005 return storeIds;
2006 }
2007 }