1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.SortedMap;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.TreeSet;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Abortable;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.exceptions.DeserializationException;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.zookeeper.KeeperException;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
66
67
68 private String myQueuesZnode;
69
70 private final static String RS_LOCK_ZNODE = "lock";
71
72 private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
73
74 public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
75 Abortable abortable) {
76 super(zk, conf, abortable);
77 }
78
79 @Override
80 public void init(String serverName) throws ReplicationException {
81 this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
82 try {
83 ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
84 } catch (KeeperException e) {
85 throw new ReplicationException("Could not initialize replication queues.", e);
86 }
87 }
88
89 @Override
90 public void removeQueue(String queueId) {
91 try {
92 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
93 } catch (KeeperException e) {
94 this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
95 }
96 }
97
98 @Override
99 public void addLog(String queueId, String filename) throws ReplicationException {
100 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
101 znode = ZKUtil.joinZNode(znode, filename);
102 try {
103 ZKUtil.createWithParents(this.zookeeper, znode);
104 } catch (KeeperException e) {
105 throw new ReplicationException(
106 "Could not add log because znode could not be created. queueId=" + queueId
107 + ", filename=" + filename);
108 }
109 }
110
111 @Override
112 public void removeLog(String queueId, String filename) {
113 try {
114 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
115 znode = ZKUtil.joinZNode(znode, filename);
116 ZKUtil.deleteNode(this.zookeeper, znode);
117 } catch (KeeperException e) {
118 this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename="
119 + filename + ")", e);
120 }
121 }
122
123 @Override
124 public void setLogPosition(String queueId, String filename, long position) {
125 try {
126 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
127 znode = ZKUtil.joinZNode(znode, filename);
128
129 ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
130 } catch (KeeperException e) {
131 this.abortable.abort("Failed to write replication hlog position (filename=" + filename
132 + ", position=" + position + ")", e);
133 }
134 }
135
136 @Override
137 public long getLogPosition(String queueId, String filename) throws ReplicationException {
138 String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
139 String znode = ZKUtil.joinZNode(clusterZnode, filename);
140 byte[] bytes = null;
141 try {
142 bytes = ZKUtil.getData(this.zookeeper, znode);
143 } catch (KeeperException e) {
144 throw new ReplicationException("Internal Error: could not get position in log for queueId="
145 + queueId + ", filename=" + filename, e);
146 }
147 try {
148 return ZKUtil.parseHLogPositionFrom(bytes);
149 } catch (DeserializationException de) {
150 LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
151 + "znode content, continuing.");
152 }
153
154
155 return 0;
156 }
157
158 @Override
159 public boolean isThisOurZnode(String znode) {
160 return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
161 }
162
163 @Override
164 public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
165 SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
166
167 if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
168 LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue");
169 newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
170 } else {
171 LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue");
172 if (!lockOtherRS(regionserverZnode)) {
173 return newQueues;
174 }
175 newQueues = copyQueuesFromRS(regionserverZnode);
176 deleteAnotherRSQueues(regionserverZnode);
177 }
178 return newQueues;
179 }
180
181 @Override
182 public void removeAllQueues() {
183 try {
184 ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
185 } catch (KeeperException e) {
186
187 if (e instanceof KeeperException.SessionExpiredException) {
188 return;
189 }
190 this.abortable.abort("Failed to delete replication queues for region server: "
191 + this.myQueuesZnode, e);
192 }
193 }
194
195 @Override
196 public List<String> getLogsInQueue(String queueId) {
197 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
198 List<String> result = null;
199 try {
200 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
201 } catch (KeeperException e) {
202 this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e);
203 }
204 return result;
205 }
206
207 @Override
208 public List<String> getAllQueues() {
209 List<String> listOfQueues = null;
210 try {
211 listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
212 } catch (KeeperException e) {
213 this.abortable.abort("Failed to get a list of queues for region server: "
214 + this.myQueuesZnode, e);
215 }
216 return listOfQueues;
217 }
218
219
220
221
222
223
224 private boolean lockOtherRS(String znode) {
225 try {
226 String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
227 if (parent.equals(this.myQueuesZnode)) {
228 LOG.warn("Won't lock because this is us, we're dead!");
229 return false;
230 }
231 String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
232 ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
233 } catch (KeeperException e) {
234
235
236
237
238
239 if (e instanceof KeeperException.NoNodeException
240 || e instanceof KeeperException.NodeExistsException) {
241 LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
242 + e.getMessage());
243 } else {
244 LOG.info("Failed lock other rs", e);
245 }
246 return false;
247 }
248 return true;
249 }
250
251
252
253
254
255 private void deleteAnotherRSQueues(String regionserverZnode) {
256 String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
257 try {
258 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
259 for (String cluster : clusters) {
260
261 if (cluster.equals(RS_LOCK_ZNODE)) {
262 continue;
263 }
264 String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
265 ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
266 }
267
268 ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
269 } catch (KeeperException e) {
270 if (e instanceof KeeperException.NoNodeException
271 || e instanceof KeeperException.NotEmptyException) {
272
273
274
275 if (e.getPath().equals(fullpath)) {
276 return;
277 }
278 }
279 this.abortable.abort("Failed to delete replication queues for region server: "
280 + regionserverZnode, e);
281 }
282 }
283
284
285
286
287
288
289
290 private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
291 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
292
293 String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
294 List<String> peerIdsToProcess = null;
295 List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
296 try {
297 peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
298 if (peerIdsToProcess == null) return queues;
299 for (String peerId : peerIdsToProcess) {
300 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
301 if (!peerExists(replicationQueueInfo.getPeerId())) {
302 LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
303
304 continue;
305 }
306 String newPeerId = peerId + "-" + znode;
307 String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
308
309 String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
310 List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
311 if (hlogs == null || hlogs.size() == 0) {
312 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
313 continue;
314 }
315
316 SortedSet<String> logQueue = new TreeSet<String>();
317 queues.put(newPeerId, logQueue);
318 ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
319 listOfOps.add(op);
320
321 for (String hlog : hlogs) {
322 String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
323 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
324 LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
325 String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
326 listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
327
328 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
329 logQueue.add(hlog);
330 }
331
332 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
333 }
334
335 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
336 LOG.debug(" The multi list size is: " + listOfOps.size());
337 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
338 LOG.info("Atomically moved the dead regionserver logs. ");
339 } catch (KeeperException e) {
340
341 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
342 queues.clear();
343 }
344 return queues;
345 }
346
347
348
349
350
351
352
353 private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
354
355
356 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
357 try {
358 String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
359 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
360
361 if (clusters == null || clusters.size() <= 1) {
362 return queues;
363 }
364
365 clusters.remove(RS_LOCK_ZNODE);
366 for (String cluster : clusters) {
367 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
368 if (!peerExists(replicationQueueInfo.getPeerId())) {
369 LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
370
371 continue;
372 }
373
374
375
376 String newCluster = cluster + "-" + znode;
377 String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
378 String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
379 List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
380
381 if (hlogs == null || hlogs.size() == 0) {
382 continue;
383 }
384 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
385 HConstants.EMPTY_BYTE_ARRAY);
386 SortedSet<String> logQueue = new TreeSet<String>();
387 queues.put(newCluster, logQueue);
388 for (String hlog : hlogs) {
389 String z = ZKUtil.joinZNode(clusterPath, hlog);
390 byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
391 long position = 0;
392 try {
393 position = ZKUtil.parseHLogPositionFrom(positionBytes);
394 } catch (DeserializationException e) {
395 LOG.warn("Failed parse of hlog position from the following znode: " + z
396 + ", Exception: " + e);
397 }
398 LOG.debug("Creating " + hlog + " with data " + position);
399 String child = ZKUtil.joinZNode(newClusterZnode, hlog);
400
401
402 ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
403 logQueue.add(hlog);
404 }
405 }
406 } catch (KeeperException e) {
407 this.abortable.abort("Copy queues from rs", e);
408 }
409 return queues;
410 }
411
412
413
414
415
416
417 static byte[] lockToByteArray(final String lockOwner) {
418 byte[] bytes =
419 ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
420 return ProtobufUtil.prependPBMagic(bytes);
421 }
422 }