View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.SplitLogCounters;
53  import org.apache.hadoop.hbase.SplitLogTask;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.exceptions.DeserializationException;
56  import org.apache.hadoop.hbase.io.hfile.HFile;
57  import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
58  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
61  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
63  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
64  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
65  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66  import org.apache.hadoop.hbase.util.FSUtils;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.hbase.util.Threads;
69  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
72  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
73  import org.apache.hadoop.util.StringUtils;
74  import org.apache.zookeeper.AsyncCallback;
75  import org.apache.zookeeper.CreateMode;
76  import org.apache.zookeeper.KeeperException;
77  import org.apache.zookeeper.KeeperException.NoNodeException;
78  import org.apache.zookeeper.ZooDefs.Ids;
79  import org.apache.zookeeper.data.Stat;
80  
81  import com.google.common.annotations.VisibleForTesting;
82  
83  /**
84   * Distributes the task of log splitting to the available region servers.
85   * Coordination happens via zookeeper. For every log file that has to be split a
86   * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task.
87   *
88   * <p>SplitLogManager monitors the task znodes that it creates using the
89   * timeoutMonitor thread. If a task's progress is slow then
90   * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
91   * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
92   * task's znode is deleted by SplitLogManager.
93   *
94   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
95   * log files. The caller thread waits in this method until all the log files
96   * have been split.
97   *
98   * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
99   * to help reduce response time seen by the callers.
100  *
101  * <p>There is race in this design between the SplitLogManager and the
102  * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
103  * already been completed by a SplitLogWorker. We rely on the idempotency of
104  * the log splitting task for correctness.
105  *
106  * <p>It is also assumed that every log splitting task is unique and once
107  * completed (either with success or with error) it will be not be submitted
108  * again. If a task is resubmitted then there is a risk that old "delete task"
109  * can delete the re-submission.
110  */
111 @InterfaceAudience.Private
112 public class SplitLogManager extends ZooKeeperListener {
113   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
114 
115   public static final int DEFAULT_TIMEOUT = 120000;
116   public static final int DEFAULT_ZK_RETRIES = 3;
117   public static final int DEFAULT_MAX_RESUBMIT = 3;
118   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
119 
120   private final Stoppable stopper;
121   private final MasterServices master;
122   private final ServerName serverName;
123   private final TaskFinisher taskFinisher;
124   private FileSystem fs;
125   private Configuration conf;
126 
127   private long zkretries;
128   private long resubmit_threshold;
129   private long timeout;
130   private long unassignedTimeout;
131   private long lastTaskCreateTime = Long.MAX_VALUE;
132   public boolean ignoreZKDeleteForTesting = false;
133   private volatile long lastRecoveringNodeCreationTime = 0;
134   // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
135   // whether to GC stale recovering znodes
136   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
137   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
138       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
139 
140   /**
141    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
142    * operation. So the lock is used to guard such cases.
143    */
144   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
145 
146   private volatile RecoveryMode recoveryMode;
147   private volatile boolean isDrainingDone = false;
148 
149   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
150   private TimeoutMonitor timeoutMonitor;
151 
152   private volatile Set<ServerName> deadWorkers = null;
153   private final Object deadWorkersLock = new Object();
154 
155   private Set<String> failedDeletions = null;
156 
157   /**
158    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
159    *   Stoppable stopper, MasterServices master, ServerName serverName,
160    *   boolean masterRecovery, TaskFinisher tf)}
161    * that provides a task finisher for copying recovered edits to their final destination.
162    * The task finisher has to be robust because it can be arbitrarily restarted or called
163    * multiple times.
164    *
165    * @param zkw the ZK watcher
166    * @param conf the HBase configuration
167    * @param stopper the stoppable in case anything is wrong
168    * @param master the master services
169    * @param serverName the master server name
170    * @param masterRecovery an indication if the master is in recovery
171    * @throws KeeperException
172    * @throws InterruptedIOException
173    */
174   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
175       Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery)
176       throws InterruptedIOException, KeeperException {
177     this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
178       @Override
179       public Status finish(ServerName workerName, String logfile) {
180         try {
181           HLogSplitter.finishSplitLogFile(logfile, conf);
182         } catch (IOException e) {
183           LOG.warn("Could not finish splitting of log file " + logfile, e);
184           return Status.ERR;
185         }
186         return Status.DONE;
187       }
188     });
189   }
190 
191   /**
192    * Its OK to construct this object even when region-servers are not online. It
193    * does lookup the orphan tasks in zk but it doesn't block waiting for them
194    * to be done.
195    *
196    * @param zkw the ZK watcher
197    * @param conf the HBase configuration
198    * @param stopper the stoppable in case anything is wrong
199    * @param master the master services
200    * @param serverName the master server name
201    * @param masterRecovery an indication if the master is in recovery
202    * @param tf task finisher
203    * @throws KeeperException
204    * @throws InterruptedIOException
205    */
206   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
207         Stoppable stopper, MasterServices master,
208         ServerName serverName, boolean masterRecovery, TaskFinisher tf)
209       throws InterruptedIOException, KeeperException {
210     super(zkw);
211     this.taskFinisher = tf;
212     this.conf = conf;
213     this.stopper = stopper;
214     this.master = master;
215     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
216     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
217     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
218     this.unassignedTimeout =
219       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
220 
221     // Determine recovery mode
222     setRecoveryMode(true);
223 
224     LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
225       ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
226 
227     this.serverName = serverName;
228     this.timeoutMonitor = new TimeoutMonitor(
229       conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
230 
231     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
232 
233     if (!masterRecovery) {
234       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
235           + ".splitLogManagerTimeoutMonitor");
236     }
237     // Watcher can be null during tests with Mock'd servers.
238     if (this.watcher != null) {
239       this.watcher.registerListener(this);
240       lookForOrphans();
241     }
242   }
243 
244   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
245     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
246     for (Path hLogDir : logDirs) {
247       this.fs = hLogDir.getFileSystem(conf);
248       if (!fs.exists(hLogDir)) {
249         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
250         continue;
251       }
252       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
253       if (logfiles == null || logfiles.length == 0) {
254         LOG.info(hLogDir + " is empty dir, no logs to split");
255       } else {
256         Collections.addAll(fileStatus, logfiles);
257       }
258     }
259     FileStatus[] a = new FileStatus[fileStatus.size()];
260     return fileStatus.toArray(a);
261   }
262 
263   /**
264    * @param logDir
265    *            one region sever hlog dir path in .logs
266    * @throws IOException
267    *             if there was an error while splitting any log file
268    * @return cumulative size of the logfiles split
269    * @throws IOException
270    */
271   public long splitLogDistributed(final Path logDir) throws IOException {
272     List<Path> logDirs = new ArrayList<Path>();
273     logDirs.add(logDir);
274     return splitLogDistributed(logDirs);
275   }
276 
277   /**
278    * The caller will block until all the log files of the given region server
279    * have been processed - successfully split or an error is encountered - by an
280    * available worker region server. This method must only be called after the
281    * region servers have been brought online.
282    *
283    * @param logDirs List of log dirs to split
284    * @throws IOException If there was an error while splitting any log file
285    * @return cumulative size of the logfiles split
286    */
287   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
288     if (logDirs.isEmpty()) {
289       return 0;
290     }
291     Set<ServerName> serverNames = new HashSet<ServerName>();
292     for (Path logDir : logDirs) {
293       try {
294         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
295         if (serverName != null) {
296           serverNames.add(serverName);
297         }
298       } catch (IllegalArgumentException e) {
299         // ignore invalid format error.
300         LOG.warn("Cannot parse server name from " + logDir);
301       }
302     }
303     return splitLogDistributed(serverNames, logDirs, null);
304   }
305 
306   /**
307    * The caller will block until all the hbase:meta log files of the given region server
308    * have been processed - successfully split or an error is encountered - by an
309    * available worker region server. This method must only be called after the
310    * region servers have been brought online.
311    *
312    * @param logDirs List of log dirs to split
313    * @param filter the Path filter to select specific files for considering
314    * @throws IOException If there was an error while splitting any log file
315    * @return cumulative size of the logfiles split
316    */
317   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
318       PathFilter filter) throws IOException {
319     MonitoredTask status = TaskMonitor.get().createStatus(
320           "Doing distributed log split in " + logDirs);
321     FileStatus[] logfiles = getFileList(logDirs, filter);
322     status.setStatus("Checking directory contents...");
323     LOG.debug("Scheduling batch of logs to split");
324     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
325     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
326     long t = EnvironmentEdgeManager.currentTimeMillis();
327     long totalSize = 0;
328     TaskBatch batch = new TaskBatch();
329     Boolean isMetaRecovery = (filter == null) ? null : false;
330     for (FileStatus lf : logfiles) {
331       // TODO If the log file is still being written to - which is most likely
332       // the case for the last log file - then its length will show up here
333       // as zero. The size of such a file can only be retrieved after
334       // recover-lease is done. totalSize will be under in most cases and the
335       // metrics that it drives will also be under-reported.
336       totalSize += lf.getLen();
337       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
338       if (!enqueueSplitTask(pathToLog, batch)) {
339         throw new IOException("duplicate log split scheduled for " + lf.getPath());
340       }
341     }
342     waitForSplittingCompletion(batch, status);
343     // remove recovering regions from ZK
344     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
345       // we split meta regions and user regions separately therefore logfiles are either all for
346       // meta or user regions but won't for both( we could have mixed situations in tests)
347       isMetaRecovery = true;
348     }
349     this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
350 
351     if (batch.done != batch.installed) {
352       batch.isDead = true;
353       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
354       LOG.warn("error while splitting logs in " + logDirs +
355       " installed = " + batch.installed + " but only " + batch.done + " done");
356       String msg = "error or interrupted while splitting logs in "
357         + logDirs + " Task = " + batch;
358       status.abort(msg);
359       throw new IOException(msg);
360     }
361     for(Path logDir: logDirs){
362       status.setStatus("Cleaning up log directory...");
363       try {
364         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
365           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
366         }
367       } catch (IOException ioe) {
368         FileStatus[] files = fs.listStatus(logDir);
369         if (files != null && files.length > 0) {
370           LOG.warn("returning success without actually splitting and " +
371               "deleting all the log files in path " + logDir);
372         } else {
373           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
374         }
375       }
376       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
377     }
378     String msg = "finished splitting (more than or equal to) " + totalSize +
379         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
380         (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
381     status.markComplete(msg);
382     LOG.info(msg);
383     return totalSize;
384   }
385 
386   /**
387    * Add a task entry to splitlog znode if it is not already there.
388    *
389    * @param taskname the path of the log to be split
390    * @param batch the batch this task belongs to
391    * @return true if a new entry is created, false if it is already there.
392    */
393   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
394     SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
395     // This is a znode path under the splitlog dir with the rest of the path made up of an
396     // url encoding of the passed in log to split.
397     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
398     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
399     Task oldtask = createTaskIfAbsent(path, batch);
400     if (oldtask == null) {
401       // publish the task in zk
402       createNode(path, zkretries);
403       return true;
404     }
405     return false;
406   }
407 
408   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
409     synchronized (batch) {
410       while ((batch.done + batch.error) != batch.installed) {
411         try {
412           status.setStatus("Waiting for distributed tasks to finish. "
413               + " scheduled=" + batch.installed
414               + " done=" + batch.done
415               + " error=" + batch.error);
416           int remaining = batch.installed - (batch.done + batch.error);
417           int actual = activeTasks(batch);
418           if (remaining != actual) {
419             LOG.warn("Expected " + remaining
420               + " active tasks, but actually there are " + actual);
421           }
422           int remainingInZK = remainingTasksInZK();
423           if (remainingInZK >= 0 && actual > remainingInZK) {
424             LOG.warn("Expected at least" + actual
425               + " tasks in ZK, but actually there are " + remainingInZK);
426           }
427           if (remainingInZK == 0 || actual == 0) {
428             LOG.warn("No more task remaining (ZK or task map), splitting "
429               + "should have completed. Remaining tasks in ZK " + remainingInZK
430               + ", active tasks in map " + actual);
431             if (remainingInZK == 0 && actual == 0) {
432               return;
433             }
434           }
435           batch.wait(100);
436           if (stopper.isStopped()) {
437             LOG.warn("Stopped while waiting for log splits to be completed");
438             return;
439           }
440         } catch (InterruptedException e) {
441           LOG.warn("Interrupted while waiting for log splits to be completed");
442           Thread.currentThread().interrupt();
443           return;
444         }
445       }
446     }
447   }
448 
449   @VisibleForTesting
450   ConcurrentMap<String, Task> getTasks() {
451     return tasks;
452   }
453 
454   private int activeTasks(final TaskBatch batch) {
455     int count = 0;
456     for (Task t: tasks.values()) {
457       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
458         count++;
459       }
460     }
461     return count;
462   }
463 
464   private int remainingTasksInZK() {
465     int count = 0;
466     try {
467       List<String> tasks =
468         ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
469       if (tasks != null) {
470         for (String t: tasks) {
471           if (!ZKSplitLog.isRescanNode(t)) {
472             count++;
473           }
474         }
475       }
476     } catch (KeeperException ke) {
477       LOG.warn("Failed to check remaining tasks", ke);
478       count = -1;
479     }
480     return count;
481   }
482 
483   /**
484    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
485    * region server hosting the region can allow reads to the recovered region
486    * @param serverNames servers which are just recovered
487    * @param isMetaRecovery whether current recovery is for the meta region on
488    *          <code>serverNames<code>
489    */
490   private void
491       removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
492     if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
493       // the function is only used in WALEdit direct replay mode
494       return;
495     }
496 
497     final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
498     int count = 0;
499     Set<String> recoveredServerNameSet = new HashSet<String>();
500     if (serverNames != null) {
501       for (ServerName tmpServerName : serverNames) {
502         recoveredServerNameSet.add(tmpServerName.getServerName());
503       }
504     }
505 
506     try {
507       this.recoveringRegionLock.lock();
508 
509       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
510       if (tasks != null) {
511         for (String t : tasks) {
512           if (!ZKSplitLog.isRescanNode(t)) {
513             count++;
514           }
515         }
516       }
517       if (count == 0 && this.master.isInitialized()
518           && !this.master.getServerManager().areDeadServersInProgress()) {
519         // no splitting work items left
520         deleteRecoveringRegionZNodes(watcher, null);
521         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
522         // this point.
523         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
524       } else if (!recoveredServerNameSet.isEmpty()) {
525         // remove recovering regions which doesn't have any RS associated with it
526         List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
527         if (regions != null) {
528           for (String region : regions) {
529             if(isMetaRecovery != null) {
530               if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
531                   || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
532                 // skip non-meta regions when recovering the meta region or
533                 // skip the meta region when recovering user regions
534                 continue;
535               }
536             }
537             String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
538             List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
539             if (failedServers == null || failedServers.isEmpty()) {
540               ZKUtil.deleteNode(watcher, nodePath);
541               continue;
542             }
543             if (recoveredServerNameSet.containsAll(failedServers)) {
544               ZKUtil.deleteNodeRecursively(watcher, nodePath);
545             } else {
546               for (String failedServer : failedServers) {
547                 if (recoveredServerNameSet.contains(failedServer)) {
548                   String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
549                   ZKUtil.deleteNode(watcher, tmpPath);
550                 }
551               }
552             }
553           }
554         }
555       }
556     } catch (KeeperException ke) {
557       LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
558       if (serverNames != null && !serverNames.isEmpty()) {
559         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
560             isMetaRecovery));
561       }
562     } finally {
563       this.recoveringRegionLock.unlock();
564     }
565   }
566 
567   /**
568    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
569    * during master initialization phase.
570    * @param failedServers A set of known failed servers
571    * @throws KeeperException
572    */
573   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
574       throws KeeperException {
575 
576     Set<String> knownFailedServers = new HashSet<String>();
577     if (failedServers != null) {
578       for (ServerName tmpServerName : failedServers) {
579         knownFailedServers.add(tmpServerName.getServerName());
580       }
581     }
582 
583     this.recoveringRegionLock.lock();
584     try {
585       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
586       if (tasks != null) {
587         for (String t : tasks) {
588           byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
589           if (data != null) {
590             SplitLogTask slt = null;
591             try {
592               slt = SplitLogTask.parseFrom(data);
593             } catch (DeserializationException e) {
594               LOG.warn("Failed parse data for znode " + t, e);
595             }
596             if (slt != null && slt.isDone()) {
597               continue;
598             }
599           }
600           // decode the file name
601           t = ZKSplitLog.getFileName(t);
602           ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
603           if (serverName != null) {
604             knownFailedServers.add(serverName.getServerName());
605           } else {
606             LOG.warn("Found invalid WAL log file name:" + t);
607           }
608         }
609       }
610 
611       // remove recovering regions which doesn't have any RS associated with it
612       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
613       if (regions != null) {
614         for (String region : regions) {
615           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
616           List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
617           if (regionFailedServers == null || regionFailedServers.isEmpty()) {
618             ZKUtil.deleteNode(watcher, nodePath);
619             continue;
620           }
621           boolean needMoreRecovery = false;
622           for (String tmpFailedServer : regionFailedServers) {
623             if (knownFailedServers.contains(tmpFailedServer)) {
624               needMoreRecovery = true;
625               break;
626             }
627           }
628           if (!needMoreRecovery) {
629             ZKUtil.deleteNodeRecursively(watcher, nodePath);
630           }
631         }
632       }
633     } finally {
634       this.recoveringRegionLock.unlock();
635     }
636   }
637 
638   public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
639     try {
640       if (regions == null) {
641         // remove all children under /home/recovering-regions
642         LOG.info("Garbage collecting all recovering regions.");
643         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
644       } else {
645         for (String curRegion : regions) {
646           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
647           ZKUtil.deleteNodeRecursively(watcher, nodePath);
648         }
649       }
650     } catch (KeeperException e) {
651       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
652     }
653   }
654 
655   private void setDone(String path, TerminationStatus status) {
656     Task task = tasks.get(path);
657     if (task == null) {
658       if (!ZKSplitLog.isRescanNode(watcher, path)) {
659         SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
660         LOG.debug("unacquired orphan task is done " + path);
661       }
662     } else {
663       synchronized (task) {
664         if (task.status == IN_PROGRESS) {
665           if (status == SUCCESS) {
666             SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
667             LOG.info("Done splitting " + path);
668           } else {
669             SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
670             LOG.warn("Error splitting " + path);
671           }
672           task.status = status;
673           if (task.batch != null) {
674             synchronized (task.batch) {
675               if (status == SUCCESS) {
676                 task.batch.done++;
677               } else {
678                 task.batch.error++;
679               }
680               task.batch.notify();
681             }
682           }
683         }
684       }
685     }
686     // delete the task node in zk. It's an async
687     // call and no one is blocked waiting for this node to be deleted. All
688     // task names are unique (log.<timestamp>) there is no risk of deleting
689     // a future task.
690     // if a deletion fails, TimeoutMonitor will retry the same deletion later
691     deleteNode(path, zkretries);
692     return;
693   }
694 
695   private void createNode(String path, Long retry_count) {
696     SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
697     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
698     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
699     return;
700   }
701 
702   private void createNodeSuccess(String path) {
703     LOG.debug("put up splitlog task at znode " + path);
704     getDataSetWatch(path, zkretries);
705   }
706 
707   private void createNodeFailure(String path) {
708     // TODO the Manager should split the log locally instead of giving up
709     LOG.warn("failed to create task node" + path);
710     setDone(path, FAILURE);
711   }
712 
713 
714   private void getDataSetWatch(String path, Long retry_count) {
715     this.watcher.getRecoverableZooKeeper().getZooKeeper().
716         getData(path, this.watcher,
717         new GetDataAsyncCallback(true), retry_count);
718     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
719   }
720 
721   private void tryGetDataSetWatch(String path) {
722     // A negative retry count will lead to ignoring all error processing.
723     this.watcher.getRecoverableZooKeeper().getZooKeeper().
724         getData(path, this.watcher,
725         new GetDataAsyncCallback(false), Long.valueOf(-1) /* retry count */);
726     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
727   }
728 
729   private void getDataSetWatchSuccess(String path, byte[] data, int version)
730   throws DeserializationException {
731     if (data == null) {
732       if (version == Integer.MIN_VALUE) {
733         // assume all done. The task znode suddenly disappeared.
734         setDone(path, SUCCESS);
735         return;
736       }
737       SplitLogCounters.tot_mgr_null_data.incrementAndGet();
738       LOG.fatal("logic error - got null data " + path);
739       setDone(path, FAILURE);
740       return;
741     }
742     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
743     SplitLogTask slt = SplitLogTask.parseFrom(data);
744     if (slt.isUnassigned()) {
745       LOG.debug("task not yet acquired " + path + " ver = " + version);
746       handleUnassignedTask(path);
747     } else if (slt.isOwned()) {
748       heartbeat(path, version, slt.getServerName());
749     } else if (slt.isResigned()) {
750       LOG.info("task " + path + " entered state: " + slt.toString());
751       resubmitOrFail(path, FORCE);
752     } else if (slt.isDone()) {
753       LOG.info("task " + path + " entered state: " + slt.toString());
754       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
755         if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
756           setDone(path, SUCCESS);
757         } else {
758           resubmitOrFail(path, CHECK);
759         }
760       } else {
761         setDone(path, SUCCESS);
762       }
763     } else if (slt.isErr()) {
764       LOG.info("task " + path + " entered state: " + slt.toString());
765       resubmitOrFail(path, CHECK);
766     } else {
767       LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
768       setDone(path, FAILURE);
769     }
770   }
771 
772   private void getDataSetWatchFailure(String path) {
773     LOG.warn("failed to set data watch " + path);
774     setDone(path, FAILURE);
775   }
776 
777   /**
778    * It is possible for a task to stay in UNASSIGNED state indefinitely - say
779    * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
780    * state but it dies before it could create the RESCAN task node to signal
781    * the SplitLogWorkers to pick up the task. To prevent this scenario the
782    * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
783    *
784    * @param path
785    */
786   private void handleUnassignedTask(String path) {
787     if (ZKSplitLog.isRescanNode(watcher, path)) {
788       return;
789     }
790     Task task = findOrCreateOrphanTask(path);
791     if (task.isOrphan() && (task.incarnation == 0)) {
792       LOG.info("resubmitting unassigned orphan task " + path);
793       // ignore failure to resubmit. The timeout-monitor will handle it later
794       // albeit in a more crude fashion
795       resubmit(path, task, FORCE);
796     }
797   }
798 
799   /**
800    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
801    * @param statusCode integer value of a ZooKeeper exception code
802    * @param action description message about the retried action
803    * @return true when need to abandon retries otherwise false
804    */
805   private boolean needAbandonRetries(int statusCode, String action) {
806     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
807       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
808           + "action=" + action);
809       return true;
810     }
811     return false;
812   }
813 
814   private void heartbeat(String path, int new_version, ServerName workerName) {
815     Task task = findOrCreateOrphanTask(path);
816     if (new_version != task.last_version) {
817       if (task.isUnassigned()) {
818         LOG.info("task " + path + " acquired by " + workerName);
819       }
820       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
821       SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
822     } else {
823       // duplicate heartbeats - heartbeats w/o zk node version
824       // changing - are possible. The timeout thread does
825       // getDataSetWatch() just to check whether a node still
826       // exists or not
827     }
828     return;
829   }
830 
831   private boolean resubmit(String path, Task task, ResubmitDirective directive) {
832     // its ok if this thread misses the update to task.deleted. It will fail later
833     if (task.status != IN_PROGRESS) {
834       return false;
835     }
836     int version;
837     if (directive != FORCE) {
838       // We're going to resubmit:
839       //  1) immediately if the worker server is now marked as dead
840       //  2) after a configurable timeout if the server is not marked as dead but has still not
841       //       finished the task. This allows to continue if the worker cannot actually handle it,
842       //       for any reason.
843       final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
844       final boolean alive = master.getServerManager() != null ?
845           master.getServerManager().isServerOnline(task.cur_worker_name) : true;
846       if (alive && time < timeout) {
847         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server " +
848             task.cur_worker_name + " is not marked as dead, we waited for " + time +
849             " while the timeout is " + timeout);
850         return false;
851       }
852       if (task.unforcedResubmits.get() >= resubmit_threshold) {
853         if (!task.resubmitThresholdReached) {
854           task.resubmitThresholdReached = true;
855           SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
856           LOG.info("Skipping resubmissions of task " + path +
857               " because threshold " + resubmit_threshold + " reached");
858         }
859         return false;
860       }
861       // race with heartbeat() that might be changing last_version
862       version = task.last_version;
863     } else {
864       SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
865       version = -1;
866     }
867     LOG.info("resubmitting task " + path);
868     task.incarnation++;
869     try {
870       // blocking zk call but this is done from the timeout thread
871       SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
872       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
873         LOG.debug("failed to resubmit task " + path +
874             " version changed");
875         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
876         return false;
877       }
878     } catch (NoNodeException e) {
879       LOG.warn("failed to resubmit because znode doesn't exist " + path +
880           " task done (or forced done by removing the znode)");
881       try {
882         getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
883       } catch (DeserializationException e1) {
884         LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
885         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
886         return false;
887       }
888       return false;
889     } catch (KeeperException.BadVersionException e) {
890       LOG.debug("failed to resubmit task " + path + " version changed");
891       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
892       return false;
893     } catch (KeeperException e) {
894       SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
895       LOG.warn("failed to resubmit " + path, e);
896       return false;
897     }
898     // don't count forced resubmits
899     if (directive != FORCE) {
900       task.unforcedResubmits.incrementAndGet();
901     }
902     task.setUnassigned();
903     createRescanNode(Long.MAX_VALUE);
904     SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
905     return true;
906   }
907 
908   private void resubmitOrFail(String path, ResubmitDirective directive) {
909     if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
910       setDone(path, FAILURE);
911     }
912   }
913 
914   private void deleteNode(String path, Long retries) {
915     SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
916     // Once a task znode is ready for delete, that is it is in the TASK_DONE
917     // state, then no one should be writing to it anymore. That is no one
918     // will be updating the znode version any more.
919     this.watcher.getRecoverableZooKeeper().getZooKeeper().
920       delete(path, -1, new DeleteAsyncCallback(),
921         retries);
922   }
923 
924   private void deleteNodeSuccess(String path) {
925     if (ignoreZKDeleteForTesting) {
926       return;
927     }
928     Task task;
929     task = tasks.remove(path);
930     if (task == null) {
931       if (ZKSplitLog.isRescanNode(watcher, path)) {
932         SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
933       }
934       SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
935       LOG.debug("deleted task without in memory state " + path);
936       return;
937     }
938     synchronized (task) {
939       task.status = DELETED;
940       task.notify();
941     }
942     SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
943   }
944 
945   private void deleteNodeFailure(String path) {
946     LOG.info("Failed to delete node " + path + " and will retry soon.");
947     return;
948   }
949 
950   /**
951    * signal the workers that a task was resubmitted by creating the
952    * RESCAN node.
953    * @throws KeeperException
954    */
955   private void createRescanNode(long retries) {
956     // The RESCAN node will be deleted almost immediately by the
957     // SplitLogManager as soon as it is created because it is being
958     // created in the DONE state. This behavior prevents a buildup
959     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
960     // might miss the watch-trigger that creation of RESCAN node provides.
961     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
962     // therefore this behavior is safe.
963     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
964     SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
965     this.watcher.getRecoverableZooKeeper().getZooKeeper().
966       create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
967         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
968         new CreateRescanAsyncCallback(), Long.valueOf(retries));
969   }
970 
971   private void createRescanSuccess(String path) {
972     SplitLogCounters.tot_mgr_rescan.incrementAndGet();
973     getDataSetWatch(path, zkretries);
974   }
975 
976   private void createRescanFailure() {
977     LOG.fatal("logic failure, rescan failure must not happen");
978   }
979 
980   /**
981    * @param path
982    * @param batch
983    * @return null on success, existing task on error
984    */
985   private Task createTaskIfAbsent(String path, TaskBatch batch) {
986     Task oldtask;
987     // batch.installed is only changed via this function and
988     // a single thread touches batch.installed.
989     Task newtask = new Task();
990     newtask.batch = batch;
991     oldtask = tasks.putIfAbsent(path, newtask);
992     if (oldtask == null) {
993       batch.installed++;
994       return  null;
995     }
996     // new task was not used.
997     synchronized (oldtask) {
998       if (oldtask.isOrphan()) {
999         if (oldtask.status == SUCCESS) {
1000           // The task is already done. Do not install the batch for this
1001           // task because it might be too late for setDone() to update
1002           // batch.done. There is no need for the batch creator to wait for
1003           // this task to complete.
1004           return (null);
1005         }
1006         if (oldtask.status == IN_PROGRESS) {
1007           oldtask.batch = batch;
1008           batch.installed++;
1009           LOG.debug("Previously orphan task " + path + " is now being waited upon");
1010           return null;
1011         }
1012         while (oldtask.status == FAILURE) {
1013           LOG.debug("wait for status of task " + path + " to change to DELETED");
1014           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
1015           try {
1016             oldtask.wait();
1017           } catch (InterruptedException e) {
1018             Thread.currentThread().interrupt();
1019             LOG.warn("Interrupted when waiting for znode delete callback");
1020             // fall through to return failure
1021             break;
1022           }
1023         }
1024         if (oldtask.status != DELETED) {
1025           LOG.warn("Failure because previously failed task" +
1026               " state still present. Waiting for znode delete callback" +
1027               " path=" + path);
1028           return oldtask;
1029         }
1030         // reinsert the newTask and it must succeed this time
1031         Task t = tasks.putIfAbsent(path, newtask);
1032         if (t == null) {
1033           batch.installed++;
1034           return  null;
1035         }
1036         LOG.fatal("Logic error. Deleted task still present in tasks map");
1037         assert false : "Deleted task still present in tasks map";
1038         return t;
1039       }
1040       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1041       return oldtask;
1042     }
1043   }
1044 
1045   Task findOrCreateOrphanTask(String path) {
1046     Task orphanTask = new Task();
1047     Task task;
1048     task = tasks.putIfAbsent(path, orphanTask);
1049     if (task == null) {
1050       LOG.info("creating orphan task " + path);
1051       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1052       task = orphanTask;
1053     }
1054     return task;
1055   }
1056 
1057   @Override
1058   public void nodeDataChanged(String path) {
1059     Task task;
1060     task = tasks.get(path);
1061     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1062       if (task != null) {
1063         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1064       }
1065       getDataSetWatch(path, zkretries);
1066     }
1067   }
1068 
1069   public void stop() {
1070     if (timeoutMonitor != null) {
1071       timeoutMonitor.interrupt();
1072     }
1073   }
1074 
1075   private void lookForOrphans() {
1076     List<String> orphans;
1077     try {
1078        orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1079           this.watcher.splitLogZNode);
1080       if (orphans == null) {
1081         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1082         return;
1083       }
1084     } catch (KeeperException e) {
1085       LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1086           " " + StringUtils.stringifyException(e));
1087       return;
1088     }
1089     int rescan_nodes = 0;
1090     for (String path : orphans) {
1091       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1092       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1093         rescan_nodes++;
1094         LOG.debug("found orphan rescan node " + path);
1095       } else {
1096         LOG.info("found orphan task " + path);
1097       }
1098       getDataSetWatch(nodepath, zkretries);
1099     }
1100     LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1101         rescan_nodes + " rescan nodes");
1102   }
1103 
1104   /**
1105    * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
1106    * all regions of the passed in region servers
1107    * @param serverName the name of a region server
1108    * @param userRegions user regiones assigned on the region server
1109    */
1110   void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1111       throws KeeperException {
1112     if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
1113       return;
1114     }
1115 
1116     try {
1117       this.recoveringRegionLock.lock();
1118       // mark that we're creating recovering znodes
1119       this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1120 
1121       for (HRegionInfo region : userRegions) {
1122         String regionEncodeName = region.getEncodedName();
1123         long retries = this.zkretries;
1124 
1125         do {
1126           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1127           long lastRecordedFlushedSequenceId = -1;
1128           try {
1129             long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1130               regionEncodeName.getBytes());
1131 
1132             /*
1133              * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
1134              * flushed sequence id for the server]
1135              */
1136             byte[] data = ZKUtil.getData(this.watcher, nodePath);
1137             if (data == null) {
1138               ZKUtil.createSetData(this.watcher, nodePath,
1139                 ZKUtil.positionToByteArray(lastSequenceId));
1140             } else {
1141               lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1142               if (lastRecordedFlushedSequenceId < lastSequenceId) {
1143                 // update last flushed sequence id in the region level
1144                 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
1145               }
1146             }
1147             // go one level deeper with server name
1148             nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1149             if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1150               // the newly assigned RS failed even before any flush to the region
1151               lastSequenceId = lastRecordedFlushedSequenceId;
1152             }
1153             ZKUtil.createSetData(this.watcher, nodePath,
1154               ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
1155             LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1156                 + serverName);
1157 
1158             // break retry loop
1159             break;
1160           } catch (KeeperException e) {
1161             // ignore ZooKeeper exceptions inside retry loop
1162             if (retries <= 1) {
1163               throw e;
1164             }
1165             // wait a little bit for retry
1166             try {
1167               Thread.sleep(20);
1168             } catch (Exception ignoreE) {
1169               // ignore
1170             }
1171           }
1172         } while ((--retries) > 0 && (!this.stopper.isStopped()));
1173       }
1174     } finally {
1175       this.recoveringRegionLock.unlock();
1176     }
1177   }
1178 
1179   /**
1180    * @param bytes - Content of a failed region server or recovering region znode.
1181    * @return long - The last flushed sequence Id for the region server
1182    */
1183   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1184     long lastRecordedFlushedSequenceId = -1l;
1185     try {
1186       lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1187     } catch (DeserializationException e) {
1188       lastRecordedFlushedSequenceId = -1l;
1189       LOG.warn("Can't parse last flushed sequence Id", e);
1190     }
1191     return lastRecordedFlushedSequenceId;
1192   }
1193 
1194   /**
1195    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
1196    * and set watcher as well.
1197    * @param zkw
1198    * @param regionEncodedName region encode name
1199    * @return true when /hbase/recovering-regions/<current region encoded name> exists
1200    * @throws KeeperException
1201    */
1202   public static boolean
1203       isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
1204           throws KeeperException {
1205     boolean result = false;
1206     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
1207 
1208     byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
1209     if (node != null) {
1210       result = true;
1211     }
1212     return result;
1213   }
1214 
1215   /**
1216    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
1217    * @param zkw
1218    * @param serverName
1219    * @param encodedRegionName
1220    * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
1221    * @throws IOException
1222    */
1223   public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
1224       String serverName, String encodedRegionName) throws IOException {
1225     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
1226     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
1227     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
1228     // sequence Id name space (sequence Id only valid for a particular RS instance), changes
1229     // when different newly assigned RS flushes the region.
1230     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
1231     // last flushed sequence Id for each failed RS instance.
1232     RegionStoreSequenceIds result = null;
1233     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1234     nodePath = ZKUtil.joinZNode(nodePath, serverName);
1235     try {
1236       byte[] data = ZKUtil.getData(zkw, nodePath);
1237       if (data != null) {
1238         result = ZKUtil.parseRegionStoreSequenceIds(data);
1239       }
1240     } catch (KeeperException e) {
1241       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1242           + serverName + "; region=" + encodedRegionName, e);
1243     } catch (DeserializationException e) {
1244       LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
1245     }
1246     return result;
1247   }
1248 
1249   private List<String> listSplitLogTasks() throws KeeperException {
1250     List<String> taskOrRescanList = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
1251     if (taskOrRescanList == null || taskOrRescanList.isEmpty()) {
1252       return Collections.<String> emptyList();
1253     }
1254     List<String> taskList = new ArrayList<String>();
1255     for (String taskOrRescan : taskOrRescanList) {
1256       // Remove rescan nodes
1257       if (!ZKSplitLog.isRescanNode(taskOrRescan)) {
1258         taskList.add(taskOrRescan);
1259       }
1260     }
1261     return taskList;
1262   }
1263 
1264   /**
1265    * This function is to set recovery mode from outstanding split log tasks from before or
1266    * current configuration setting
1267    * @param isForInitialization
1268    * @throws KeeperException
1269    * @throws InterruptedIOException
1270    */
1271   public void setRecoveryMode(boolean isForInitialization) throws KeeperException {
1272     if(this.isDrainingDone) {
1273       // when there is no outstanding splitlogtask after master start up, we already have up to date
1274       // recovery mode
1275       return;
1276     }
1277     if(this.watcher == null) {
1278       // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
1279       this.isDrainingDone = true;
1280       this.recoveryMode = RecoveryMode.LOG_SPLITTING;
1281       return;
1282     }
1283     boolean hasSplitLogTask = false;
1284     boolean hasRecoveringRegions = false;
1285     RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
1286     RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
1287       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
1288 
1289     // Firstly check if there are outstanding recovering regions
1290     List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
1291     if (regions != null && !regions.isEmpty()) {
1292       hasRecoveringRegions = true;
1293       previousRecoveryMode = RecoveryMode.LOG_REPLAY;
1294     }
1295     if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1296       // Secondly check if there are outstanding split log task
1297       List<String> tasks = listSplitLogTasks();
1298       if (!tasks.isEmpty()) {
1299         hasSplitLogTask = true;
1300         if (isForInitialization) {
1301           // during initialization, try to get recovery mode from splitlogtask
1302           for (String task : tasks) {
1303             try {
1304               byte[] data = ZKUtil.getData(this.watcher,
1305                 ZKUtil.joinZNode(watcher.splitLogZNode, task));
1306               if (data == null) continue;
1307               SplitLogTask slt = SplitLogTask.parseFrom(data);
1308               previousRecoveryMode = slt.getMode();
1309               if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1310                 // created by old code base where we don't set recovery mode in splitlogtask
1311                 // we can safely set to LOG_SPLITTING because we're in master initialization code
1312                 // before SSH is enabled & there is no outstanding recovering regions
1313                 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
1314               }
1315               break;
1316             } catch (DeserializationException e) {
1317               LOG.warn("Failed parse data for znode " + task, e);
1318             }
1319           }
1320         }
1321       }
1322     }
1323 
1324     synchronized(this) {
1325       if(this.isDrainingDone) {
1326         return;
1327       }
1328       if (!hasSplitLogTask && !hasRecoveringRegions) {
1329         this.isDrainingDone = true;
1330         this.recoveryMode = recoveryModeInConfig;
1331         return;
1332       } else if (!isForInitialization) {
1333         // splitlogtask hasn't drained yet, keep existing recovery mode
1334         return;
1335       }
1336 
1337       if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
1338         this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
1339         this.recoveryMode = previousRecoveryMode;
1340       } else {
1341         this.recoveryMode = recoveryModeInConfig;
1342       }
1343     }
1344   }
1345 
1346   public RecoveryMode getRecoveryMode() {
1347     return this.recoveryMode;
1348   }
1349 
1350   /**
1351    * Returns if distributed log replay is turned on or not
1352    * @param conf
1353    * @return true when distributed log replay is turned on
1354    */
1355   private boolean isDistributedLogReplay(Configuration conf) {
1356     boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
1357       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
1358     int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
1359     if (LOG.isDebugEnabled()) {
1360       LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
1361     }
1362     // For distributed log replay, hfile version must be 3 at least; we need tag support.
1363     return dlr && (version >= 3);
1364   }
1365 
1366   /**
1367    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
1368    * Clients threads use this object to wait for all their tasks to be done.
1369    * <p>
1370    * All access is synchronized.
1371    */
1372   static class TaskBatch {
1373     int installed = 0;
1374     int done = 0;
1375     int error = 0;
1376     volatile boolean isDead = false;
1377 
1378     @Override
1379     public String toString() {
1380       return ("installed = " + installed + " done = " + done + " error = " + error);
1381     }
1382   }
1383 
1384   /**
1385    * in memory state of an active task.
1386    */
1387   static class Task {
1388     volatile long last_update;
1389     volatile int last_version;
1390     volatile ServerName cur_worker_name;
1391     volatile TaskBatch batch;
1392     volatile TerminationStatus status;
1393     volatile int incarnation;
1394     final AtomicInteger unforcedResubmits = new AtomicInteger();
1395     volatile boolean resubmitThresholdReached;
1396 
1397     @Override
1398     public String toString() {
1399       return ("last_update = " + last_update +
1400           " last_version = " + last_version +
1401           " cur_worker_name = " + cur_worker_name +
1402           " status = " + status +
1403           " incarnation = " + incarnation +
1404           " resubmits = " + unforcedResubmits.get() +
1405           " batch = " + batch);
1406     }
1407 
1408     Task() {
1409       incarnation = 0;
1410       last_version = -1;
1411       status = IN_PROGRESS;
1412       setUnassigned();
1413     }
1414 
1415     public boolean isOrphan() {
1416       return (batch == null || batch.isDead);
1417     }
1418 
1419     public boolean isUnassigned() {
1420       return (cur_worker_name == null);
1421     }
1422 
1423     public void heartbeatNoDetails(long time) {
1424       last_update = time;
1425     }
1426 
1427     public void heartbeat(long time, int version, ServerName worker) {
1428       last_version = version;
1429       last_update = time;
1430       cur_worker_name = worker;
1431     }
1432 
1433     public void setUnassigned() {
1434       cur_worker_name = null;
1435       last_update = -1;
1436     }
1437   }
1438 
1439   void handleDeadWorker(ServerName workerName) {
1440     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
1441     // to reason about concurrency. Makes it easier to retry.
1442     synchronized (deadWorkersLock) {
1443       if (deadWorkers == null) {
1444         deadWorkers = new HashSet<ServerName>(100);
1445       }
1446       deadWorkers.add(workerName);
1447     }
1448     LOG.info("dead splitlog worker " + workerName);
1449   }
1450 
1451   void handleDeadWorkers(Set<ServerName> serverNames) {
1452     synchronized (deadWorkersLock) {
1453       if (deadWorkers == null) {
1454         deadWorkers = new HashSet<ServerName>(100);
1455       }
1456       deadWorkers.addAll(serverNames);
1457     }
1458     LOG.info("dead splitlog workers " + serverNames);
1459   }
1460 
1461   /**
1462    * Periodically checks all active tasks and resubmits the ones that have timed
1463    * out
1464    */
1465   private class TimeoutMonitor extends Chore {
1466     private long lastLog = 0;
1467 
1468     public TimeoutMonitor(final int period, Stoppable stopper) {
1469       super("SplitLogManager Timeout Monitor", period, stopper);
1470     }
1471 
1472     @Override
1473     protected void chore() {
1474       int resubmitted = 0;
1475       int unassigned = 0;
1476       int tot = 0;
1477       boolean found_assigned_task = false;
1478       Set<ServerName> localDeadWorkers;
1479 
1480       synchronized (deadWorkersLock) {
1481         localDeadWorkers = deadWorkers;
1482         deadWorkers = null;
1483       }
1484 
1485       for (Map.Entry<String, Task> e : tasks.entrySet()) {
1486         String path = e.getKey();
1487         Task task = e.getValue();
1488         ServerName cur_worker = task.cur_worker_name;
1489         tot++;
1490         // don't easily resubmit a task which hasn't been picked up yet. It
1491         // might be a long while before a SplitLogWorker is free to pick up a
1492         // task. This is because a SplitLogWorker picks up a task one at a
1493         // time. If we want progress when there are no region servers then we
1494         // will have to run a SplitLogWorker thread in the Master.
1495         if (task.isUnassigned()) {
1496           unassigned++;
1497           continue;
1498         }
1499         found_assigned_task = true;
1500         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1501           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1502           if (resubmit(path, task, FORCE)) {
1503             resubmitted++;
1504           } else {
1505             handleDeadWorker(cur_worker);
1506             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1507                 cur_worker + ", will retry.");
1508           }
1509         } else if (resubmit(path, task, CHECK)) {
1510           resubmitted++;
1511         }
1512       }
1513       if (tot > 0) {
1514         long now = EnvironmentEdgeManager.currentTimeMillis();
1515         if (now > lastLog + 5000) {
1516           lastLog = now;
1517           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
1518         }
1519       }
1520       if (resubmitted > 0) {
1521         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1522       }
1523       // If there are pending tasks and all of them have been unassigned for
1524       // some time then put up a RESCAN node to ping the workers.
1525       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
1526       // because a. it is very unlikely that every worker had a
1527       // transient error when trying to grab the task b. if there are no
1528       // workers then all tasks wills stay unassigned indefinitely and the
1529       // manager will be indefinitely creating RESCAN nodes. TODO may be the
1530       // master should spawn both a manager and a worker thread to guarantee
1531       // that there is always one worker in the system
1532       if (tot > 0 && !found_assigned_task &&
1533           ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
1534           unassignedTimeout)) {
1535         for (Map.Entry<String, Task> e : tasks.entrySet()) {
1536           String path = e.getKey();
1537           Task task = e.getValue();
1538           // we have to do task.isUnassigned() check again because tasks might
1539           // have been asynchronously assigned. There is no locking required
1540           // for these checks ... it is OK even if tryGetDataSetWatch() is
1541           // called unnecessarily for a task
1542           if (task.isUnassigned() && (task.status != FAILURE)) {
1543             // We just touch the znode to make sure its still there
1544             tryGetDataSetWatch(path);
1545           }
1546         }
1547         createRescanNode(Long.MAX_VALUE);
1548         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1549         LOG.debug("resubmitting unassigned task(s) after timeout");
1550       }
1551 
1552       // Retry previously failed deletes
1553       if (failedDeletions.size() > 0) {
1554         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1555         for (String tmpPath : tmpPaths) {
1556           // deleteNode is an async call
1557           deleteNode(tmpPath, zkretries);
1558         }
1559         failedDeletions.removeAll(tmpPaths);
1560       }
1561 
1562       // Garbage collect left-over /hbase/recovering-regions/... znode
1563       long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1564           - lastRecoveringNodeCreationTime;
1565       if (!failedRecoveringRegionDeletions.isEmpty()
1566           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1567         // inside the function there have more checks before GC anything
1568         if (!failedRecoveringRegionDeletions.isEmpty()) {
1569           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
1570               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
1571           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
1572           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
1573             removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
1574           }
1575         } else {
1576           removeRecoveringRegionsFromZK(null, null);
1577         }
1578       }
1579     }
1580   }
1581 
1582   /**
1583    * Asynchronous handler for zk create node results.
1584    * Retries on failures.
1585    */
1586   class CreateAsyncCallback implements AsyncCallback.StringCallback {
1587     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1588 
1589     @Override
1590     public void processResult(int rc, String path, Object ctx, String name) {
1591       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1592       if (rc != 0) {
1593         if (needAbandonRetries(rc, "Create znode " + path)) {
1594           createNodeFailure(path);
1595           return;
1596         }
1597         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1598           // What if there is a delete pending against this pre-existing
1599           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
1600           // state. Only operations that will be carried out on this node by
1601           // this manager are get-znode-data, task-finisher and delete-znode.
1602           // And all code pieces correctly handle the case of suddenly
1603           // disappearing task-znode.
1604           LOG.debug("found pre-existing znode " + path);
1605           SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1606         } else {
1607           Long retry_count = (Long)ctx;
1608           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1609               path + " remaining retries=" + retry_count);
1610           if (retry_count == 0) {
1611             SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1612             createNodeFailure(path);
1613           } else {
1614             SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1615             createNode(path, retry_count - 1);
1616           }
1617           return;
1618         }
1619       }
1620       createNodeSuccess(path);
1621     }
1622   }
1623 
1624   /**
1625    * Asynchronous handler for zk get-data-set-watch on node results.
1626    * Retries on failures.
1627    */
1628   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1629     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1630     private boolean completeTaskOnNoNode;
1631 
1632     /**
1633      * @param completeTaskOnNoNode Complete the task if the znode cannot be found.
1634      * Since in-memory task creation and znode creation are not atomic, there might be
1635      * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor).
1636      * In this case completeTaskOnNoNode should be set to false. See HBASE-11217.
1637      */
1638     public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
1639       this.completeTaskOnNoNode = completeTaskOnNoNode;
1640     }
1641 
1642     @Override
1643     public void processResult(int rc, String path, Object ctx, byte[] data,
1644         Stat stat) {
1645       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1646       if (rc != 0) {
1647         if (needAbandonRetries(rc, "GetData from znode " + path)) {
1648           return;
1649         }
1650         if (rc == KeeperException.Code.NONODE.intValue()) {
1651           SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1652           LOG.warn("task znode " + path + " vanished.");
1653           if (completeTaskOnNoNode) {
1654             // The task znode has been deleted. Must be some pending delete
1655             // that deleted the task. Assume success because a task-znode is
1656             // is only deleted after TaskFinisher is successful.
1657             try {
1658               getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1659             } catch (DeserializationException e) {
1660               LOG.warn("Deserialization problem", e);
1661             }
1662           }
1663           return;
1664         }
1665         Long retry_count = (Long) ctx;
1666 
1667         if (retry_count < 0) {
1668           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1669               path + ". Ignoring error. No error handling. No retrying.");
1670           return;
1671         }
1672         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1673             path + " remaining retries=" + retry_count);
1674         if (retry_count == 0) {
1675           SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1676           getDataSetWatchFailure(path);
1677         } else {
1678           SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1679           getDataSetWatch(path, retry_count - 1);
1680         }
1681         return;
1682       }
1683       try {
1684         getDataSetWatchSuccess(path, data, stat.getVersion());
1685       } catch (DeserializationException e) {
1686         LOG.warn("Deserialization problem", e);
1687       }
1688       return;
1689     }
1690   }
1691 
1692   /**
1693    * Asynchronous handler for zk delete node results.
1694    * Retries on failures.
1695    */
1696   class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1697     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1698 
1699     @Override
1700     public void processResult(int rc, String path, Object ctx) {
1701       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1702       if (rc != 0) {
1703         if (needAbandonRetries(rc, "Delete znode " + path)) {
1704           failedDeletions.add(path);
1705           return;
1706         }
1707         if (rc != KeeperException.Code.NONODE.intValue()) {
1708           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1709           Long retry_count = (Long) ctx;
1710           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1711               path + " remaining retries=" + retry_count);
1712           if (retry_count == 0) {
1713             LOG.warn("delete failed " + path);
1714             failedDeletions.add(path);
1715             deleteNodeFailure(path);
1716           } else {
1717             deleteNode(path, retry_count - 1);
1718           }
1719           return;
1720         } else {
1721           LOG.info(path +
1722             " does not exist. Either was created but deleted behind our" +
1723             " back by another pending delete OR was deleted" +
1724             " in earlier retry rounds. zkretries = " + (Long) ctx);
1725         }
1726       } else {
1727         LOG.debug("deleted " + path);
1728       }
1729       deleteNodeSuccess(path);
1730     }
1731   }
1732 
1733   /**
1734    * Asynchronous handler for zk create RESCAN-node results.
1735    * Retries on failures.
1736    * <p>
1737    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
1738    * for all the {@link SplitLogWorker}s to rescan for new tasks.
1739    */
1740   class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1741     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1742 
1743     @Override
1744     public void processResult(int rc, String path, Object ctx, String name) {
1745       if (rc != 0) {
1746         if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1747           return;
1748         }
1749         Long retry_count = (Long)ctx;
1750         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1751             " remaining retries=" + retry_count);
1752         if (retry_count == 0) {
1753           createRescanFailure();
1754         } else {
1755           createRescanNode(retry_count - 1);
1756         }
1757         return;
1758       }
1759       // path is the original arg, name is the actual name that was created
1760       createRescanSuccess(name);
1761     }
1762   }
1763 
1764   /**
1765    * {@link SplitLogManager} can use objects implementing this interface to
1766    * finish off a partially done task by {@link SplitLogWorker}. This provides
1767    * a serialization point at the end of the task processing. Must be
1768    * restartable and idempotent.
1769    */
1770   public interface TaskFinisher {
1771     /**
1772      * status that can be returned finish()
1773      */
1774     enum Status {
1775       /**
1776        * task completed successfully
1777        */
1778       DONE(),
1779       /**
1780        * task completed with error
1781        */
1782       ERR();
1783     }
1784     /**
1785      * finish the partially done task. workername provides clue to where the
1786      * partial results of the partially done tasks are present. taskname is the
1787      * name of the task that was put up in zookeeper.
1788      * <p>
1789      * @param workerName
1790      * @param taskname
1791      * @return DONE if task completed successfully, ERR otherwise
1792      */
1793     Status finish(ServerName workerName, String taskname);
1794   }
1795 
1796   enum ResubmitDirective {
1797     CHECK(),
1798     FORCE();
1799   }
1800 
1801   enum TerminationStatus {
1802     IN_PROGRESS("in_progress"),
1803     SUCCESS("success"),
1804     FAILURE("failure"),
1805     DELETED("deleted");
1806 
1807     String statusMsg;
1808     TerminationStatus(String msg) {
1809       statusMsg = msg;
1810     }
1811 
1812     @Override
1813     public String toString() {
1814       return statusMsg;
1815     }
1816   }
1817 }