1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.hbase.classification.InterfaceAudience;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.PathFilter;
48 import org.apache.hadoop.hbase.Chore;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.SplitLogCounters;
53 import org.apache.hadoop.hbase.SplitLogTask;
54 import org.apache.hadoop.hbase.Stoppable;
55 import org.apache.hadoop.hbase.exceptions.DeserializationException;
56 import org.apache.hadoop.hbase.io.hfile.HFile;
57 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
58 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
61 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
63 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
64 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
65 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66 import org.apache.hadoop.hbase.util.FSUtils;
67 import org.apache.hadoop.hbase.util.Pair;
68 import org.apache.hadoop.hbase.util.Threads;
69 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
72 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
73 import org.apache.hadoop.util.StringUtils;
74 import org.apache.zookeeper.AsyncCallback;
75 import org.apache.zookeeper.CreateMode;
76 import org.apache.zookeeper.KeeperException;
77 import org.apache.zookeeper.KeeperException.NoNodeException;
78 import org.apache.zookeeper.ZooDefs.Ids;
79 import org.apache.zookeeper.data.Stat;
80
81 import com.google.common.annotations.VisibleForTesting;
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 @InterfaceAudience.Private
112 public class SplitLogManager extends ZooKeeperListener {
113 private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
114
115 public static final int DEFAULT_TIMEOUT = 120000;
116 public static final int DEFAULT_ZK_RETRIES = 3;
117 public static final int DEFAULT_MAX_RESUBMIT = 3;
118 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000);
119
120 private final Stoppable stopper;
121 private final MasterServices master;
122 private final ServerName serverName;
123 private final TaskFinisher taskFinisher;
124 private FileSystem fs;
125 private Configuration conf;
126
127 private long zkretries;
128 private long resubmit_threshold;
129 private long timeout;
130 private long unassignedTimeout;
131 private long lastTaskCreateTime = Long.MAX_VALUE;
132 public boolean ignoreZKDeleteForTesting = false;
133 private volatile long lastRecoveringNodeCreationTime = 0;
134
135
136 private long checkRecoveringTimeThreshold = 15000;
137 private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
138 .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
139
140
141
142
143
144 protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
145
146 private volatile RecoveryMode recoveryMode;
147 private volatile boolean isDrainingDone = false;
148
149 private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
150 private TimeoutMonitor timeoutMonitor;
151
152 private volatile Set<ServerName> deadWorkers = null;
153 private final Object deadWorkersLock = new Object();
154
155 private Set<String> failedDeletions = null;
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
175 Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery)
176 throws InterruptedIOException, KeeperException {
177 this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
178 @Override
179 public Status finish(ServerName workerName, String logfile) {
180 try {
181 HLogSplitter.finishSplitLogFile(logfile, conf);
182 } catch (IOException e) {
183 LOG.warn("Could not finish splitting of log file " + logfile, e);
184 return Status.ERR;
185 }
186 return Status.DONE;
187 }
188 });
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206 public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
207 Stoppable stopper, MasterServices master,
208 ServerName serverName, boolean masterRecovery, TaskFinisher tf)
209 throws InterruptedIOException, KeeperException {
210 super(zkw);
211 this.taskFinisher = tf;
212 this.conf = conf;
213 this.stopper = stopper;
214 this.master = master;
215 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
216 this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
217 this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
218 this.unassignedTimeout =
219 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
220
221
222 setRecoveryMode(true);
223
224 LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
225 ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
226
227 this.serverName = serverName;
228 this.timeoutMonitor = new TimeoutMonitor(
229 conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
230
231 this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
232
233 if (!masterRecovery) {
234 Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
235 + ".splitLogManagerTimeoutMonitor");
236 }
237
238 if (this.watcher != null) {
239 this.watcher.registerListener(this);
240 lookForOrphans();
241 }
242 }
243
244 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
245 List<FileStatus> fileStatus = new ArrayList<FileStatus>();
246 for (Path hLogDir : logDirs) {
247 this.fs = hLogDir.getFileSystem(conf);
248 if (!fs.exists(hLogDir)) {
249 LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
250 continue;
251 }
252 FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
253 if (logfiles == null || logfiles.length == 0) {
254 LOG.info(hLogDir + " is empty dir, no logs to split");
255 } else {
256 Collections.addAll(fileStatus, logfiles);
257 }
258 }
259 FileStatus[] a = new FileStatus[fileStatus.size()];
260 return fileStatus.toArray(a);
261 }
262
263
264
265
266
267
268
269
270
271 public long splitLogDistributed(final Path logDir) throws IOException {
272 List<Path> logDirs = new ArrayList<Path>();
273 logDirs.add(logDir);
274 return splitLogDistributed(logDirs);
275 }
276
277
278
279
280
281
282
283
284
285
286
287 public long splitLogDistributed(final List<Path> logDirs) throws IOException {
288 if (logDirs.isEmpty()) {
289 return 0;
290 }
291 Set<ServerName> serverNames = new HashSet<ServerName>();
292 for (Path logDir : logDirs) {
293 try {
294 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
295 if (serverName != null) {
296 serverNames.add(serverName);
297 }
298 } catch (IllegalArgumentException e) {
299
300 LOG.warn("Cannot parse server name from " + logDir);
301 }
302 }
303 return splitLogDistributed(serverNames, logDirs, null);
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
318 PathFilter filter) throws IOException {
319 MonitoredTask status = TaskMonitor.get().createStatus(
320 "Doing distributed log split in " + logDirs);
321 FileStatus[] logfiles = getFileList(logDirs, filter);
322 status.setStatus("Checking directory contents...");
323 LOG.debug("Scheduling batch of logs to split");
324 SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
325 LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
326 long t = EnvironmentEdgeManager.currentTimeMillis();
327 long totalSize = 0;
328 TaskBatch batch = new TaskBatch();
329 Boolean isMetaRecovery = (filter == null) ? null : false;
330 for (FileStatus lf : logfiles) {
331
332
333
334
335
336 totalSize += lf.getLen();
337 String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
338 if (!enqueueSplitTask(pathToLog, batch)) {
339 throw new IOException("duplicate log split scheduled for " + lf.getPath());
340 }
341 }
342 waitForSplittingCompletion(batch, status);
343
344 if (filter == MasterFileSystem.META_FILTER
345
346
347 isMetaRecovery = true;
348 }
349 this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
350
351 if (batch.done != batch.installed) {
352 batch.isDead = true;
353 SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
354 LOG.warn("error while splitting logs in " + logDirs +
355 " installed = " + batch.installed + " but only " + batch.done + " done");
356 String msg = "error or interrupted while splitting logs in "
357 + logDirs + " Task = " + batch;
358 status.abort(msg);
359 throw new IOException(msg);
360 }
361 for(Path logDir: logDirs){
362 status.setStatus("Cleaning up log directory...");
363 try {
364 if (fs.exists(logDir) && !fs.delete(logDir, false)) {
365 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
366 }
367 } catch (IOException ioe) {
368 FileStatus[] files = fs.listStatus(logDir);
369 if (files != null && files.length > 0) {
370 LOG.warn("returning success without actually splitting and " +
371 "deleting all the log files in path " + logDir);
372 } else {
373 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
374 }
375 }
376 SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
377 }
378 String msg = "finished splitting (more than or equal to) " + totalSize +
379 " bytes in " + batch.installed + " log files in " + logDirs + " in " +
380 (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
381 status.markComplete(msg);
382 LOG.info(msg);
383 return totalSize;
384 }
385
386
387
388
389
390
391
392
393 boolean enqueueSplitTask(String taskname, TaskBatch batch) {
394 SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
395
396
397 String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
398 lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
399 Task oldtask = createTaskIfAbsent(path, batch);
400 if (oldtask == null) {
401
402 createNode(path, zkretries);
403 return true;
404 }
405 return false;
406 }
407
408 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
409 synchronized (batch) {
410 while ((batch.done + batch.error) != batch.installed) {
411 try {
412 status.setStatus("Waiting for distributed tasks to finish. "
413 + " scheduled=" + batch.installed
414 + " done=" + batch.done
415 + " error=" + batch.error);
416 int remaining = batch.installed - (batch.done + batch.error);
417 int actual = activeTasks(batch);
418 if (remaining != actual) {
419 LOG.warn("Expected " + remaining
420 + " active tasks, but actually there are " + actual);
421 }
422 int remainingInZK = remainingTasksInZK();
423 if (remainingInZK >= 0 && actual > remainingInZK) {
424 LOG.warn("Expected at least" + actual
425 + " tasks in ZK, but actually there are " + remainingInZK);
426 }
427 if (remainingInZK == 0 || actual == 0) {
428 LOG.warn("No more task remaining (ZK or task map), splitting "
429 + "should have completed. Remaining tasks in ZK " + remainingInZK
430 + ", active tasks in map " + actual);
431 if (remainingInZK == 0 && actual == 0) {
432 return;
433 }
434 }
435 batch.wait(100);
436 if (stopper.isStopped()) {
437 LOG.warn("Stopped while waiting for log splits to be completed");
438 return;
439 }
440 } catch (InterruptedException e) {
441 LOG.warn("Interrupted while waiting for log splits to be completed");
442 Thread.currentThread().interrupt();
443 return;
444 }
445 }
446 }
447 }
448
449 @VisibleForTesting
450 ConcurrentMap<String, Task> getTasks() {
451 return tasks;
452 }
453
454 private int activeTasks(final TaskBatch batch) {
455 int count = 0;
456 for (Task t: tasks.values()) {
457 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
458 count++;
459 }
460 }
461 return count;
462 }
463
464 private int remainingTasksInZK() {
465 int count = 0;
466 try {
467 List<String> tasks =
468 ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
469 if (tasks != null) {
470 for (String t: tasks) {
471 if (!ZKSplitLog.isRescanNode(t)) {
472 count++;
473 }
474 }
475 }
476 } catch (KeeperException ke) {
477 LOG.warn("Failed to check remaining tasks", ke);
478 count = -1;
479 }
480 return count;
481 }
482
483
484
485
486
487
488
489
490 private void
491 removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
492 if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
493
494 return;
495 }
496
497 final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
498 int count = 0;
499 Set<String> recoveredServerNameSet = new HashSet<String>();
500 if (serverNames != null) {
501 for (ServerName tmpServerName : serverNames) {
502 recoveredServerNameSet.add(tmpServerName.getServerName());
503 }
504 }
505
506 try {
507 this.recoveringRegionLock.lock();
508
509 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
510 if (tasks != null) {
511 for (String t : tasks) {
512 if (!ZKSplitLog.isRescanNode(t)) {
513 count++;
514 }
515 }
516 }
517 if (count == 0 && this.master.isInitialized()
518 && !this.master.getServerManager().areDeadServersInProgress()) {
519
520 deleteRecoveringRegionZNodes(watcher, null);
521
522
523 lastRecoveringNodeCreationTime = Long.MAX_VALUE;
524 } else if (!recoveredServerNameSet.isEmpty()) {
525
526 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
527 if (regions != null) {
528 for (String region : regions) {
529 if(isMetaRecovery != null) {
530 if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
531 || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
532
533
534 continue;
535 }
536 }
537 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
538 List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
539 if (failedServers == null || failedServers.isEmpty()) {
540 ZKUtil.deleteNode(watcher, nodePath);
541 continue;
542 }
543 if (recoveredServerNameSet.containsAll(failedServers)) {
544 ZKUtil.deleteNodeRecursively(watcher, nodePath);
545 } else {
546 for (String failedServer : failedServers) {
547 if (recoveredServerNameSet.contains(failedServer)) {
548 String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
549 ZKUtil.deleteNode(watcher, tmpPath);
550 }
551 }
552 }
553 }
554 }
555 }
556 } catch (KeeperException ke) {
557 LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
558 if (serverNames != null && !serverNames.isEmpty()) {
559 this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
560 isMetaRecovery));
561 }
562 } finally {
563 this.recoveringRegionLock.unlock();
564 }
565 }
566
567
568
569
570
571
572
573 void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
574 throws KeeperException {
575
576 Set<String> knownFailedServers = new HashSet<String>();
577 if (failedServers != null) {
578 for (ServerName tmpServerName : failedServers) {
579 knownFailedServers.add(tmpServerName.getServerName());
580 }
581 }
582
583 this.recoveringRegionLock.lock();
584 try {
585 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
586 if (tasks != null) {
587 for (String t : tasks) {
588 byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
589 if (data != null) {
590 SplitLogTask slt = null;
591 try {
592 slt = SplitLogTask.parseFrom(data);
593 } catch (DeserializationException e) {
594 LOG.warn("Failed parse data for znode " + t, e);
595 }
596 if (slt != null && slt.isDone()) {
597 continue;
598 }
599 }
600
601 t = ZKSplitLog.getFileName(t);
602 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
603 if (serverName != null) {
604 knownFailedServers.add(serverName.getServerName());
605 } else {
606 LOG.warn("Found invalid WAL log file name:" + t);
607 }
608 }
609 }
610
611
612 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
613 if (regions != null) {
614 for (String region : regions) {
615 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
616 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
617 if (regionFailedServers == null || regionFailedServers.isEmpty()) {
618 ZKUtil.deleteNode(watcher, nodePath);
619 continue;
620 }
621 boolean needMoreRecovery = false;
622 for (String tmpFailedServer : regionFailedServers) {
623 if (knownFailedServers.contains(tmpFailedServer)) {
624 needMoreRecovery = true;
625 break;
626 }
627 }
628 if (!needMoreRecovery) {
629 ZKUtil.deleteNodeRecursively(watcher, nodePath);
630 }
631 }
632 }
633 } finally {
634 this.recoveringRegionLock.unlock();
635 }
636 }
637
638 public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
639 try {
640 if (regions == null) {
641
642 LOG.info("Garbage collecting all recovering regions.");
643 ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
644 } else {
645 for (String curRegion : regions) {
646 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
647 ZKUtil.deleteNodeRecursively(watcher, nodePath);
648 }
649 }
650 } catch (KeeperException e) {
651 LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
652 }
653 }
654
655 private void setDone(String path, TerminationStatus status) {
656 Task task = tasks.get(path);
657 if (task == null) {
658 if (!ZKSplitLog.isRescanNode(watcher, path)) {
659 SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
660 LOG.debug("unacquired orphan task is done " + path);
661 }
662 } else {
663 synchronized (task) {
664 if (task.status == IN_PROGRESS) {
665 if (status == SUCCESS) {
666 SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
667 LOG.info("Done splitting " + path);
668 } else {
669 SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
670 LOG.warn("Error splitting " + path);
671 }
672 task.status = status;
673 if (task.batch != null) {
674 synchronized (task.batch) {
675 if (status == SUCCESS) {
676 task.batch.done++;
677 } else {
678 task.batch.error++;
679 }
680 task.batch.notify();
681 }
682 }
683 }
684 }
685 }
686
687
688
689
690
691 deleteNode(path, zkretries);
692 return;
693 }
694
695 private void createNode(String path, Long retry_count) {
696 SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
697 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
698 SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
699 return;
700 }
701
702 private void createNodeSuccess(String path) {
703 LOG.debug("put up splitlog task at znode " + path);
704 getDataSetWatch(path, zkretries);
705 }
706
707 private void createNodeFailure(String path) {
708
709 LOG.warn("failed to create task node" + path);
710 setDone(path, FAILURE);
711 }
712
713
714 private void getDataSetWatch(String path, Long retry_count) {
715 this.watcher.getRecoverableZooKeeper().getZooKeeper().
716 getData(path, this.watcher,
717 new GetDataAsyncCallback(true), retry_count);
718 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
719 }
720
721 private void tryGetDataSetWatch(String path) {
722
723 this.watcher.getRecoverableZooKeeper().getZooKeeper().
724 getData(path, this.watcher,
725 new GetDataAsyncCallback(false), Long.valueOf(-1)
726 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
727 }
728
729 private void getDataSetWatchSuccess(String path, byte[] data, int version)
730 throws DeserializationException {
731 if (data == null) {
732 if (version == Integer.MIN_VALUE) {
733
734 setDone(path, SUCCESS);
735 return;
736 }
737 SplitLogCounters.tot_mgr_null_data.incrementAndGet();
738 LOG.fatal("logic error - got null data " + path);
739 setDone(path, FAILURE);
740 return;
741 }
742 data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
743 SplitLogTask slt = SplitLogTask.parseFrom(data);
744 if (slt.isUnassigned()) {
745 LOG.debug("task not yet acquired " + path + " ver = " + version);
746 handleUnassignedTask(path);
747 } else if (slt.isOwned()) {
748 heartbeat(path, version, slt.getServerName());
749 } else if (slt.isResigned()) {
750 LOG.info("task " + path + " entered state: " + slt.toString());
751 resubmitOrFail(path, FORCE);
752 } else if (slt.isDone()) {
753 LOG.info("task " + path + " entered state: " + slt.toString());
754 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
755 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
756 setDone(path, SUCCESS);
757 } else {
758 resubmitOrFail(path, CHECK);
759 }
760 } else {
761 setDone(path, SUCCESS);
762 }
763 } else if (slt.isErr()) {
764 LOG.info("task " + path + " entered state: " + slt.toString());
765 resubmitOrFail(path, CHECK);
766 } else {
767 LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
768 setDone(path, FAILURE);
769 }
770 }
771
772 private void getDataSetWatchFailure(String path) {
773 LOG.warn("failed to set data watch " + path);
774 setDone(path, FAILURE);
775 }
776
777
778
779
780
781
782
783
784
785
786 private void handleUnassignedTask(String path) {
787 if (ZKSplitLog.isRescanNode(watcher, path)) {
788 return;
789 }
790 Task task = findOrCreateOrphanTask(path);
791 if (task.isOrphan() && (task.incarnation == 0)) {
792 LOG.info("resubmitting unassigned orphan task " + path);
793
794
795 resubmit(path, task, FORCE);
796 }
797 }
798
799
800
801
802
803
804
805 private boolean needAbandonRetries(int statusCode, String action) {
806 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
807 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
808 + "action=" + action);
809 return true;
810 }
811 return false;
812 }
813
814 private void heartbeat(String path, int new_version, ServerName workerName) {
815 Task task = findOrCreateOrphanTask(path);
816 if (new_version != task.last_version) {
817 if (task.isUnassigned()) {
818 LOG.info("task " + path + " acquired by " + workerName);
819 }
820 task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
821 SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
822 } else {
823
824
825
826
827 }
828 return;
829 }
830
831 private boolean resubmit(String path, Task task, ResubmitDirective directive) {
832
833 if (task.status != IN_PROGRESS) {
834 return false;
835 }
836 int version;
837 if (directive != FORCE) {
838
839
840
841
842
843 final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
844 final boolean alive = master.getServerManager() != null ?
845 master.getServerManager().isServerOnline(task.cur_worker_name) : true;
846 if (alive && time < timeout) {
847 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " +
848 task.cur_worker_name + " is not marked as dead, we waited for " + time +
849 " while the timeout is " + timeout);
850 return false;
851 }
852 if (task.unforcedResubmits.get() >= resubmit_threshold) {
853 if (!task.resubmitThresholdReached) {
854 task.resubmitThresholdReached = true;
855 SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
856 LOG.info("Skipping resubmissions of task " + path +
857 " because threshold " + resubmit_threshold + " reached");
858 }
859 return false;
860 }
861
862 version = task.last_version;
863 } else {
864 SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
865 version = -1;
866 }
867 LOG.info("resubmitting task " + path);
868 task.incarnation++;
869 try {
870
871 SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
872 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
873 LOG.debug("failed to resubmit task " + path +
874 " version changed");
875 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
876 return false;
877 }
878 } catch (NoNodeException e) {
879 LOG.warn("failed to resubmit because znode doesn't exist " + path +
880 " task done (or forced done by removing the znode)");
881 try {
882 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
883 } catch (DeserializationException e1) {
884 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
885 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
886 return false;
887 }
888 return false;
889 } catch (KeeperException.BadVersionException e) {
890 LOG.debug("failed to resubmit task " + path + " version changed");
891 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
892 return false;
893 } catch (KeeperException e) {
894 SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
895 LOG.warn("failed to resubmit " + path, e);
896 return false;
897 }
898
899 if (directive != FORCE) {
900 task.unforcedResubmits.incrementAndGet();
901 }
902 task.setUnassigned();
903 createRescanNode(Long.MAX_VALUE);
904 SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
905 return true;
906 }
907
908 private void resubmitOrFail(String path, ResubmitDirective directive) {
909 if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
910 setDone(path, FAILURE);
911 }
912 }
913
914 private void deleteNode(String path, Long retries) {
915 SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
916
917
918
919 this.watcher.getRecoverableZooKeeper().getZooKeeper().
920 delete(path, -1, new DeleteAsyncCallback(),
921 retries);
922 }
923
924 private void deleteNodeSuccess(String path) {
925 if (ignoreZKDeleteForTesting) {
926 return;
927 }
928 Task task;
929 task = tasks.remove(path);
930 if (task == null) {
931 if (ZKSplitLog.isRescanNode(watcher, path)) {
932 SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
933 }
934 SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
935 LOG.debug("deleted task without in memory state " + path);
936 return;
937 }
938 synchronized (task) {
939 task.status = DELETED;
940 task.notify();
941 }
942 SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
943 }
944
945 private void deleteNodeFailure(String path) {
946 LOG.info("Failed to delete node " + path + " and will retry soon.");
947 return;
948 }
949
950
951
952
953
954
955 private void createRescanNode(long retries) {
956
957
958
959
960
961
962
963 lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
964 SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
965 this.watcher.getRecoverableZooKeeper().getZooKeeper().
966 create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
967 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
968 new CreateRescanAsyncCallback(), Long.valueOf(retries));
969 }
970
971 private void createRescanSuccess(String path) {
972 SplitLogCounters.tot_mgr_rescan.incrementAndGet();
973 getDataSetWatch(path, zkretries);
974 }
975
976 private void createRescanFailure() {
977 LOG.fatal("logic failure, rescan failure must not happen");
978 }
979
980
981
982
983
984
985 private Task createTaskIfAbsent(String path, TaskBatch batch) {
986 Task oldtask;
987
988
989 Task newtask = new Task();
990 newtask.batch = batch;
991 oldtask = tasks.putIfAbsent(path, newtask);
992 if (oldtask == null) {
993 batch.installed++;
994 return null;
995 }
996
997 synchronized (oldtask) {
998 if (oldtask.isOrphan()) {
999 if (oldtask.status == SUCCESS) {
1000
1001
1002
1003
1004 return (null);
1005 }
1006 if (oldtask.status == IN_PROGRESS) {
1007 oldtask.batch = batch;
1008 batch.installed++;
1009 LOG.debug("Previously orphan task " + path + " is now being waited upon");
1010 return null;
1011 }
1012 while (oldtask.status == FAILURE) {
1013 LOG.debug("wait for status of task " + path + " to change to DELETED");
1014 SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
1015 try {
1016 oldtask.wait();
1017 } catch (InterruptedException e) {
1018 Thread.currentThread().interrupt();
1019 LOG.warn("Interrupted when waiting for znode delete callback");
1020
1021 break;
1022 }
1023 }
1024 if (oldtask.status != DELETED) {
1025 LOG.warn("Failure because previously failed task" +
1026 " state still present. Waiting for znode delete callback" +
1027 " path=" + path);
1028 return oldtask;
1029 }
1030
1031 Task t = tasks.putIfAbsent(path, newtask);
1032 if (t == null) {
1033 batch.installed++;
1034 return null;
1035 }
1036 LOG.fatal("Logic error. Deleted task still present in tasks map");
1037 assert false : "Deleted task still present in tasks map";
1038 return t;
1039 }
1040 LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1041 return oldtask;
1042 }
1043 }
1044
1045 Task findOrCreateOrphanTask(String path) {
1046 Task orphanTask = new Task();
1047 Task task;
1048 task = tasks.putIfAbsent(path, orphanTask);
1049 if (task == null) {
1050 LOG.info("creating orphan task " + path);
1051 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1052 task = orphanTask;
1053 }
1054 return task;
1055 }
1056
1057 @Override
1058 public void nodeDataChanged(String path) {
1059 Task task;
1060 task = tasks.get(path);
1061 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1062 if (task != null) {
1063 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1064 }
1065 getDataSetWatch(path, zkretries);
1066 }
1067 }
1068
1069 public void stop() {
1070 if (timeoutMonitor != null) {
1071 timeoutMonitor.interrupt();
1072 }
1073 }
1074
1075 private void lookForOrphans() {
1076 List<String> orphans;
1077 try {
1078 orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1079 this.watcher.splitLogZNode);
1080 if (orphans == null) {
1081 LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1082 return;
1083 }
1084 } catch (KeeperException e) {
1085 LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1086 " " + StringUtils.stringifyException(e));
1087 return;
1088 }
1089 int rescan_nodes = 0;
1090 for (String path : orphans) {
1091 String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1092 if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1093 rescan_nodes++;
1094 LOG.debug("found orphan rescan node " + path);
1095 } else {
1096 LOG.info("found orphan task " + path);
1097 }
1098 getDataSetWatch(nodepath, zkretries);
1099 }
1100 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1101 rescan_nodes + " rescan nodes");
1102 }
1103
1104
1105
1106
1107
1108
1109
1110 void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1111 throws KeeperException {
1112 if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
1113 return;
1114 }
1115
1116 try {
1117 this.recoveringRegionLock.lock();
1118
1119 this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1120
1121 for (HRegionInfo region : userRegions) {
1122 String regionEncodeName = region.getEncodedName();
1123 long retries = this.zkretries;
1124
1125 do {
1126 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1127 long lastRecordedFlushedSequenceId = -1;
1128 try {
1129 long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1130 regionEncodeName.getBytes());
1131
1132
1133
1134
1135
1136 byte[] data = ZKUtil.getData(this.watcher, nodePath);
1137 if (data == null) {
1138 ZKUtil.createSetData(this.watcher, nodePath,
1139 ZKUtil.positionToByteArray(lastSequenceId));
1140 } else {
1141 lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1142 if (lastRecordedFlushedSequenceId < lastSequenceId) {
1143
1144 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
1145 }
1146 }
1147
1148 nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1149 if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1150
1151 lastSequenceId = lastRecordedFlushedSequenceId;
1152 }
1153 ZKUtil.createSetData(this.watcher, nodePath,
1154 ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
1155 LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1156 + serverName);
1157
1158
1159 break;
1160 } catch (KeeperException e) {
1161
1162 if (retries <= 1) {
1163 throw e;
1164 }
1165
1166 try {
1167 Thread.sleep(20);
1168 } catch (Exception ignoreE) {
1169
1170 }
1171 }
1172 } while ((--retries) > 0 && (!this.stopper.isStopped()));
1173 }
1174 } finally {
1175 this.recoveringRegionLock.unlock();
1176 }
1177 }
1178
1179
1180
1181
1182
1183 public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1184 long lastRecordedFlushedSequenceId = -1l;
1185 try {
1186 lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1187 } catch (DeserializationException e) {
1188 lastRecordedFlushedSequenceId = -1l;
1189 LOG.warn("Can't parse last flushed sequence Id", e);
1190 }
1191 return lastRecordedFlushedSequenceId;
1192 }
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202 public static boolean
1203 isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
1204 throws KeeperException {
1205 boolean result = false;
1206 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
1207
1208 byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
1209 if (node != null) {
1210 result = true;
1211 }
1212 return result;
1213 }
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223 public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
1224 String serverName, String encodedRegionName) throws IOException {
1225
1226
1227
1228
1229
1230
1231
1232 RegionStoreSequenceIds result = null;
1233 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1234 nodePath = ZKUtil.joinZNode(nodePath, serverName);
1235 try {
1236 byte[] data = ZKUtil.getData(zkw, nodePath);
1237 if (data != null) {
1238 result = ZKUtil.parseRegionStoreSequenceIds(data);
1239 }
1240 } catch (KeeperException e) {
1241 throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1242 + serverName + "; region=" + encodedRegionName, e);
1243 } catch (DeserializationException e) {
1244 LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
1245 }
1246 return result;
1247 }
1248
1249 private List<String> listSplitLogTasks() throws KeeperException {
1250 List<String> taskOrRescanList = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
1251 if (taskOrRescanList == null || taskOrRescanList.isEmpty()) {
1252 return Collections.<String> emptyList();
1253 }
1254 List<String> taskList = new ArrayList<String>();
1255 for (String taskOrRescan : taskOrRescanList) {
1256
1257 if (!ZKSplitLog.isRescanNode(taskOrRescan)) {
1258 taskList.add(taskOrRescan);
1259 }
1260 }
1261 return taskList;
1262 }
1263
1264
1265
1266
1267
1268
1269
1270
1271 public void setRecoveryMode(boolean isForInitialization) throws KeeperException {
1272 if(this.isDrainingDone) {
1273
1274
1275 return;
1276 }
1277 if(this.watcher == null) {
1278
1279 this.isDrainingDone = true;
1280 this.recoveryMode = RecoveryMode.LOG_SPLITTING;
1281 return;
1282 }
1283 boolean hasSplitLogTask = false;
1284 boolean hasRecoveringRegions = false;
1285 RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
1286 RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
1287 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
1288
1289
1290 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
1291 if (regions != null && !regions.isEmpty()) {
1292 hasRecoveringRegions = true;
1293 previousRecoveryMode = RecoveryMode.LOG_REPLAY;
1294 }
1295 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1296
1297 List<String> tasks = listSplitLogTasks();
1298 if (!tasks.isEmpty()) {
1299 hasSplitLogTask = true;
1300 if (isForInitialization) {
1301
1302 for (String task : tasks) {
1303 try {
1304 byte[] data = ZKUtil.getData(this.watcher,
1305 ZKUtil.joinZNode(watcher.splitLogZNode, task));
1306 if (data == null) continue;
1307 SplitLogTask slt = SplitLogTask.parseFrom(data);
1308 previousRecoveryMode = slt.getMode();
1309 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1310
1311
1312
1313 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
1314 }
1315 break;
1316 } catch (DeserializationException e) {
1317 LOG.warn("Failed parse data for znode " + task, e);
1318 }
1319 }
1320 }
1321 }
1322 }
1323
1324 synchronized(this) {
1325 if(this.isDrainingDone) {
1326 return;
1327 }
1328 if (!hasSplitLogTask && !hasRecoveringRegions) {
1329 this.isDrainingDone = true;
1330 this.recoveryMode = recoveryModeInConfig;
1331 return;
1332 } else if (!isForInitialization) {
1333
1334 return;
1335 }
1336
1337 if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
1338 this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
1339 this.recoveryMode = previousRecoveryMode;
1340 } else {
1341 this.recoveryMode = recoveryModeInConfig;
1342 }
1343 }
1344 }
1345
1346 public RecoveryMode getRecoveryMode() {
1347 return this.recoveryMode;
1348 }
1349
1350
1351
1352
1353
1354
1355 private boolean isDistributedLogReplay(Configuration conf) {
1356 boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
1357 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
1358 int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
1359 if (LOG.isDebugEnabled()) {
1360 LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
1361 }
1362
1363 return dlr && (version >= 3);
1364 }
1365
1366
1367
1368
1369
1370
1371
1372 static class TaskBatch {
1373 int installed = 0;
1374 int done = 0;
1375 int error = 0;
1376 volatile boolean isDead = false;
1377
1378 @Override
1379 public String toString() {
1380 return ("installed = " + installed + " done = " + done + " error = " + error);
1381 }
1382 }
1383
1384
1385
1386
1387 static class Task {
1388 volatile long last_update;
1389 volatile int last_version;
1390 volatile ServerName cur_worker_name;
1391 volatile TaskBatch batch;
1392 volatile TerminationStatus status;
1393 volatile int incarnation;
1394 final AtomicInteger unforcedResubmits = new AtomicInteger();
1395 volatile boolean resubmitThresholdReached;
1396
1397 @Override
1398 public String toString() {
1399 return ("last_update = " + last_update +
1400 " last_version = " + last_version +
1401 " cur_worker_name = " + cur_worker_name +
1402 " status = " + status +
1403 " incarnation = " + incarnation +
1404 " resubmits = " + unforcedResubmits.get() +
1405 " batch = " + batch);
1406 }
1407
1408 Task() {
1409 incarnation = 0;
1410 last_version = -1;
1411 status = IN_PROGRESS;
1412 setUnassigned();
1413 }
1414
1415 public boolean isOrphan() {
1416 return (batch == null || batch.isDead);
1417 }
1418
1419 public boolean isUnassigned() {
1420 return (cur_worker_name == null);
1421 }
1422
1423 public void heartbeatNoDetails(long time) {
1424 last_update = time;
1425 }
1426
1427 public void heartbeat(long time, int version, ServerName worker) {
1428 last_version = version;
1429 last_update = time;
1430 cur_worker_name = worker;
1431 }
1432
1433 public void setUnassigned() {
1434 cur_worker_name = null;
1435 last_update = -1;
1436 }
1437 }
1438
1439 void handleDeadWorker(ServerName workerName) {
1440
1441
1442 synchronized (deadWorkersLock) {
1443 if (deadWorkers == null) {
1444 deadWorkers = new HashSet<ServerName>(100);
1445 }
1446 deadWorkers.add(workerName);
1447 }
1448 LOG.info("dead splitlog worker " + workerName);
1449 }
1450
1451 void handleDeadWorkers(Set<ServerName> serverNames) {
1452 synchronized (deadWorkersLock) {
1453 if (deadWorkers == null) {
1454 deadWorkers = new HashSet<ServerName>(100);
1455 }
1456 deadWorkers.addAll(serverNames);
1457 }
1458 LOG.info("dead splitlog workers " + serverNames);
1459 }
1460
1461
1462
1463
1464
1465 private class TimeoutMonitor extends Chore {
1466 private long lastLog = 0;
1467
1468 public TimeoutMonitor(final int period, Stoppable stopper) {
1469 super("SplitLogManager Timeout Monitor", period, stopper);
1470 }
1471
1472 @Override
1473 protected void chore() {
1474 int resubmitted = 0;
1475 int unassigned = 0;
1476 int tot = 0;
1477 boolean found_assigned_task = false;
1478 Set<ServerName> localDeadWorkers;
1479
1480 synchronized (deadWorkersLock) {
1481 localDeadWorkers = deadWorkers;
1482 deadWorkers = null;
1483 }
1484
1485 for (Map.Entry<String, Task> e : tasks.entrySet()) {
1486 String path = e.getKey();
1487 Task task = e.getValue();
1488 ServerName cur_worker = task.cur_worker_name;
1489 tot++;
1490
1491
1492
1493
1494
1495 if (task.isUnassigned()) {
1496 unassigned++;
1497 continue;
1498 }
1499 found_assigned_task = true;
1500 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1501 SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1502 if (resubmit(path, task, FORCE)) {
1503 resubmitted++;
1504 } else {
1505 handleDeadWorker(cur_worker);
1506 LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1507 cur_worker + ", will retry.");
1508 }
1509 } else if (resubmit(path, task, CHECK)) {
1510 resubmitted++;
1511 }
1512 }
1513 if (tot > 0) {
1514 long now = EnvironmentEdgeManager.currentTimeMillis();
1515 if (now > lastLog + 5000) {
1516 lastLog = now;
1517 LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
1518 }
1519 }
1520 if (resubmitted > 0) {
1521 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1522 }
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532 if (tot > 0 && !found_assigned_task &&
1533 ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
1534 unassignedTimeout)) {
1535 for (Map.Entry<String, Task> e : tasks.entrySet()) {
1536 String path = e.getKey();
1537 Task task = e.getValue();
1538
1539
1540
1541
1542 if (task.isUnassigned() && (task.status != FAILURE)) {
1543
1544 tryGetDataSetWatch(path);
1545 }
1546 }
1547 createRescanNode(Long.MAX_VALUE);
1548 SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1549 LOG.debug("resubmitting unassigned task(s) after timeout");
1550 }
1551
1552
1553 if (failedDeletions.size() > 0) {
1554 List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1555 for (String tmpPath : tmpPaths) {
1556
1557 deleteNode(tmpPath, zkretries);
1558 }
1559 failedDeletions.removeAll(tmpPaths);
1560 }
1561
1562
1563 long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1564 - lastRecoveringNodeCreationTime;
1565 if (!failedRecoveringRegionDeletions.isEmpty()
1566 || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1567
1568 if (!failedRecoveringRegionDeletions.isEmpty()) {
1569 List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
1570 new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
1571 failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
1572 for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
1573 removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
1574 }
1575 } else {
1576 removeRecoveringRegionsFromZK(null, null);
1577 }
1578 }
1579 }
1580 }
1581
1582
1583
1584
1585
1586 class CreateAsyncCallback implements AsyncCallback.StringCallback {
1587 private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1588
1589 @Override
1590 public void processResult(int rc, String path, Object ctx, String name) {
1591 SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1592 if (rc != 0) {
1593 if (needAbandonRetries(rc, "Create znode " + path)) {
1594 createNodeFailure(path);
1595 return;
1596 }
1597 if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1598
1599
1600
1601
1602
1603
1604 LOG.debug("found pre-existing znode " + path);
1605 SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1606 } else {
1607 Long retry_count = (Long)ctx;
1608 LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1609 path + " remaining retries=" + retry_count);
1610 if (retry_count == 0) {
1611 SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1612 createNodeFailure(path);
1613 } else {
1614 SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1615 createNode(path, retry_count - 1);
1616 }
1617 return;
1618 }
1619 }
1620 createNodeSuccess(path);
1621 }
1622 }
1623
1624
1625
1626
1627
1628 class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1629 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1630 private boolean completeTaskOnNoNode;
1631
1632
1633
1634
1635
1636
1637
1638 public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
1639 this.completeTaskOnNoNode = completeTaskOnNoNode;
1640 }
1641
1642 @Override
1643 public void processResult(int rc, String path, Object ctx, byte[] data,
1644 Stat stat) {
1645 SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1646 if (rc != 0) {
1647 if (needAbandonRetries(rc, "GetData from znode " + path)) {
1648 return;
1649 }
1650 if (rc == KeeperException.Code.NONODE.intValue()) {
1651 SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1652 LOG.warn("task znode " + path + " vanished.");
1653 if (completeTaskOnNoNode) {
1654
1655
1656
1657 try {
1658 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1659 } catch (DeserializationException e) {
1660 LOG.warn("Deserialization problem", e);
1661 }
1662 }
1663 return;
1664 }
1665 Long retry_count = (Long) ctx;
1666
1667 if (retry_count < 0) {
1668 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1669 path + ". Ignoring error. No error handling. No retrying.");
1670 return;
1671 }
1672 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1673 path + " remaining retries=" + retry_count);
1674 if (retry_count == 0) {
1675 SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1676 getDataSetWatchFailure(path);
1677 } else {
1678 SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1679 getDataSetWatch(path, retry_count - 1);
1680 }
1681 return;
1682 }
1683 try {
1684 getDataSetWatchSuccess(path, data, stat.getVersion());
1685 } catch (DeserializationException e) {
1686 LOG.warn("Deserialization problem", e);
1687 }
1688 return;
1689 }
1690 }
1691
1692
1693
1694
1695
1696 class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1697 private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1698
1699 @Override
1700 public void processResult(int rc, String path, Object ctx) {
1701 SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1702 if (rc != 0) {
1703 if (needAbandonRetries(rc, "Delete znode " + path)) {
1704 failedDeletions.add(path);
1705 return;
1706 }
1707 if (rc != KeeperException.Code.NONODE.intValue()) {
1708 SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1709 Long retry_count = (Long) ctx;
1710 LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1711 path + " remaining retries=" + retry_count);
1712 if (retry_count == 0) {
1713 LOG.warn("delete failed " + path);
1714 failedDeletions.add(path);
1715 deleteNodeFailure(path);
1716 } else {
1717 deleteNode(path, retry_count - 1);
1718 }
1719 return;
1720 } else {
1721 LOG.info(path +
1722 " does not exist. Either was created but deleted behind our" +
1723 " back by another pending delete OR was deleted" +
1724 " in earlier retry rounds. zkretries = " + (Long) ctx);
1725 }
1726 } else {
1727 LOG.debug("deleted " + path);
1728 }
1729 deleteNodeSuccess(path);
1730 }
1731 }
1732
1733
1734
1735
1736
1737
1738
1739
1740 class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1741 private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1742
1743 @Override
1744 public void processResult(int rc, String path, Object ctx, String name) {
1745 if (rc != 0) {
1746 if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1747 return;
1748 }
1749 Long retry_count = (Long)ctx;
1750 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1751 " remaining retries=" + retry_count);
1752 if (retry_count == 0) {
1753 createRescanFailure();
1754 } else {
1755 createRescanNode(retry_count - 1);
1756 }
1757 return;
1758 }
1759
1760 createRescanSuccess(name);
1761 }
1762 }
1763
1764
1765
1766
1767
1768
1769
1770 public interface TaskFinisher {
1771
1772
1773
1774 enum Status {
1775
1776
1777
1778 DONE(),
1779
1780
1781
1782 ERR();
1783 }
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793 Status finish(ServerName workerName, String taskname);
1794 }
1795
1796 enum ResubmitDirective {
1797 CHECK(),
1798 FORCE();
1799 }
1800
1801 enum TerminationStatus {
1802 IN_PROGRESS("in_progress"),
1803 SUCCESS("success"),
1804 FAILURE("failure"),
1805 DELETED("deleted");
1806
1807 String statusMsg;
1808 TerminationStatus(String msg) {
1809 statusMsg = msg;
1810 }
1811
1812 @Override
1813 public String toString() {
1814 return statusMsg;
1815 }
1816 }
1817 }