View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.ConnectException;
24  import java.net.SocketTimeoutException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.commons.lang.math.RandomUtils;
31  import org.apache.commons.lang.mutable.MutableInt;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.NotServingRegionException;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.SplitLogCounters;
42  import org.apache.hadoop.hbase.SplitLogTask;
43  import org.apache.hadoop.hbase.client.HConnectionManager;
44  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
45  import org.apache.hadoop.hbase.exceptions.DeserializationException;
46  import org.apache.hadoop.hbase.executor.ExecutorService;
47  import org.apache.hadoop.hbase.master.SplitLogManager;
48  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
49  import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
50  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
51  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
52  import org.apache.hadoop.hbase.util.CancelableProgressable;
53  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54  import org.apache.hadoop.hbase.util.FSUtils;
55  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
56  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
57  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
58  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
59  import org.apache.hadoop.util.StringUtils;
60  import org.apache.zookeeper.AsyncCallback;
61  import org.apache.zookeeper.KeeperException;
62  import org.apache.zookeeper.data.Stat;
63  
64  /**
65   * This worker is spawned in every regionserver (should we also spawn one in
66   * the master?). The Worker waits for log splitting tasks to be put up by the
67   * {@link SplitLogManager} running in the master and races with other workers
68   * in other serves to acquire those tasks. The coordination is done via
69   * zookeeper. All the action takes place at /hbase/splitlog znode.
70   * <p>
71   * If a worker has successfully moved the task from state UNASSIGNED to
72   * OWNED then it owns the task. It keeps heart beating the manager by
73   * periodically moving the task from UNASSIGNED to OWNED state. On success it
74   * moves the task to TASK_DONE. On unrecoverable error it moves task state to
75   * ERR. If it cannot continue but wants the master to retry the task then it
76   * moves the task state to RESIGNED.
77   * <p>
78   * The manager can take a task away from a worker by moving the task from
79   * OWNED to UNASSIGNED. In the absence of a global lock there is a
80   * unavoidable race here - a worker might have just finished its task when it
81   * is stripped of its ownership. Here we rely on the idempotency of the log
82   * splitting task for correctness
83   */
84  @InterfaceAudience.Private
85  public class SplitLogWorker extends ZooKeeperListener implements Runnable {
86    public static final int DEFAULT_MAX_SPLITTERS = 2;
87  
88    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
89    private static final int checkInterval = 5000; // 5 seconds
90    private static final int FAILED_TO_OWN_TASK = -1;
91  
92    Thread worker;
93    private final ServerName serverName;
94    private final TaskExecutor splitTaskExecutor;
95    // thread pool which executes recovery work
96    private final ExecutorService executorService;
97  
98    private final Object taskReadyLock = new Object();
99    volatile int taskReadySeq = 0;
100   private volatile String currentTask = null;
101   private int currentVersion;
102   private volatile boolean exitWorker;
103   private final Object grabTaskLock = new Object();
104   private boolean workerInGrabTask = false;
105   private final int report_period;
106   private RegionServerServices server = null;
107   private Configuration conf = null;
108   protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
109   private int maxConcurrentTasks = 0;
110 
111   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
112       TaskExecutor splitTaskExecutor) {
113     super(watcher);
114     this.server = server;
115     this.serverName = server.getServerName();
116     this.splitTaskExecutor = splitTaskExecutor;
117     report_period = conf.getInt("hbase.splitlog.report.period",
118       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
119     this.conf = conf;
120     this.executorService = this.server.getExecutorService();
121     this.maxConcurrentTasks =
122         conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
123   }
124 
125   public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
126       RegionServerServices server, final LastSequenceId sequenceIdChecker) {
127     this(watcher, conf, server, new TaskExecutor() {
128       @Override
129       public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
130         Path rootdir;
131         FileSystem fs;
132         try {
133           rootdir = FSUtils.getRootDir(conf);
134           fs = rootdir.getFileSystem(conf);
135         } catch (IOException e) {
136           LOG.warn("could not find root dir or fs", e);
137           return Status.RESIGNED;
138         }
139         // TODO have to correctly figure out when log splitting has been
140         // interrupted or has encountered a transient error and when it has
141         // encountered a bad non-retry-able persistent error.
142         try {
143           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
144             fs, conf, p, sequenceIdChecker, watcher, mode)) {
145             return Status.PREEMPTED;
146           }
147         } catch (InterruptedIOException iioe) {
148           LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
149           return Status.RESIGNED;
150         } catch (IOException e) {
151           Throwable cause = e.getCause();
152           if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException 
153                   || cause instanceof ConnectException 
154                   || cause instanceof SocketTimeoutException)) {
155             LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
156             		+ "resigning", e);
157             return Status.RESIGNED;
158           } else if (cause instanceof InterruptedException) {
159             LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
160             return Status.RESIGNED;
161           } else if(cause instanceof KeeperException) {
162             LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
163             return Status.RESIGNED;
164           }
165           LOG.warn("log splitting of " + filename + " failed, returning error", e);
166           return Status.ERR;
167         }
168         return Status.DONE;
169       }
170     });
171   }
172 
173   @Override
174   public void run() {
175     try {
176       LOG.info("SplitLogWorker " + this.serverName + " starting");
177       this.watcher.registerListener(this);
178       // pre-initialize a new connection for splitlogworker configuration
179       HConnectionManager.getConnection(conf);
180 
181       // wait for master to create the splitLogZnode
182       int res = -1;
183       while (res == -1 && !exitWorker) {
184         try {
185           res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
186         } catch (KeeperException e) {
187           // ignore
188           LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying", e);
189         }
190         if (res == -1) {
191           try {
192             LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
193             Thread.sleep(1000);
194           } catch (InterruptedException e) {
195             LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
196                 + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
197                 "exiting anyway)"));
198             exitWorker = true;
199             break;
200           }
201         }
202       }
203 
204       if (!exitWorker) {
205         taskLoop();
206       }
207     } catch (Throwable t) {
208       // only a logical error can cause here. Printing it out
209       // to make debugging easier
210       LOG.error("unexpected error ", t);
211     } finally {
212       LOG.info("SplitLogWorker " + this.serverName + " exiting");
213     }
214   }
215 
216   /**
217    * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
218    * one at a time. This policy puts an upper-limit on the number of
219    * simultaneous log splitting that could be happening in a cluster.
220    * <p>
221    * Synchronization using {@link #taskReadyLock} ensures that it will
222    * try to grab every task that has been put up
223    */
224   private void taskLoop() {
225     while (!exitWorker) {
226       int seq_start = taskReadySeq;
227       List<String> paths = getTaskList();
228       if (paths == null) {
229         LOG.warn("Could not get tasks, did someone remove " +
230             this.watcher.splitLogZNode + " ... worker thread exiting.");
231         return;
232       }
233       // pick meta wal firstly
234       int offset = (int) (Math.random() * paths.size());
235       for(int i = 0; i < paths.size(); i ++){
236         if(HLogUtil.isMetaFile(paths.get(i))) {
237           offset = i;
238           break;
239         }
240       }
241       int numTasks = paths.size();
242       for (int i = 0; i < numTasks; i++) {
243         int idx = (i + offset) % paths.size();
244         // don't call ZKSplitLog.getNodeName() because that will lead to
245         // double encoding of the path name
246         if (this.calculateAvailableSplitters(numTasks) > 0) {
247           grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
248         } else {
249           LOG.debug("Current region server " + this.serverName + " has "
250               + this.tasksInProgress.get() + " tasks in progress and can't take more.");
251           break;
252         }
253         if (exitWorker) {
254           return;
255         }
256       }
257       SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
258       synchronized (taskReadyLock) {
259         while (seq_start == taskReadySeq) {
260           try {
261             taskReadyLock.wait(checkInterval);
262             if (this.server != null) {
263               // check to see if we have stale recovering regions in our internal memory state
264               Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
265               if (!recoveringRegions.isEmpty()) {
266                 // Make a local copy to prevent ConcurrentModificationException when other threads
267                 // modify recoveringRegions
268                 List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
269                 for (String region : tmpCopy) {
270                   String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
271                   try {
272                     if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
273                       HRegion r = recoveringRegions.remove(region);
274                       if (r != null) {
275                         r.setRecovering(false);
276                       }
277                       LOG.debug("Mark recovering region:" + region + " up.");
278                     } else {
279                       // current check is a defensive(or redundant) mechanism to prevent us from
280                       // having stale recovering regions in our internal RS memory state while
281                       // zookeeper(source of truth) says differently. We stop at the first good one
282                       // because we should not have a single instance such as this in normal case so
283                       // check the first one is good enough.
284                       break;
285                     }
286                   } catch (KeeperException e) {
287                     // ignore zookeeper error
288                     LOG.debug("Got a zookeeper when trying to open a recovering region", e);
289                     break;
290                   }
291                 }
292               }
293             }
294           } catch (InterruptedException e) {
295             LOG.info("SplitLogWorker interrupted while waiting for task," +
296                 " exiting: " + e.toString() + (exitWorker ? "" :
297                 " (ERROR: exitWorker is not set, exiting anyway)"));
298             exitWorker = true;
299             return;
300           }
301         }
302       }
303 
304     }
305   }
306 
307   /**
308    * try to grab a 'lock' on the task zk node to own and execute the task.
309    * <p>
310    * @param path zk node for the task
311    */
312   private void grabTask(String path) {
313     Stat stat = new Stat();
314     byte[] data;
315     synchronized (grabTaskLock) {
316       currentTask = path;
317       workerInGrabTask = true;
318       if (Thread.interrupted()) {
319         return;
320       }
321     }
322     try {
323       try {
324         if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
325           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
326           return;
327         }
328       } catch (KeeperException e) {
329         LOG.warn("Failed to get data for znode " + path, e);
330         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
331         return;
332       }
333       SplitLogTask slt;
334       try {
335         slt = SplitLogTask.parseFrom(data);
336       } catch (DeserializationException e) {
337         LOG.warn("Failed parse data for znode " + path, e);
338         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
339         return;
340       }
341       if (!slt.isUnassigned()) {
342         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
343         return;
344       }
345 
346       currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(), 
347         stat.getVersion());
348       if (currentVersion < 0) {
349         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
350         return;
351       }
352 
353       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
354         HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
355           SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
356         return;
357       }
358 
359       LOG.info("worker " + serverName + " acquired task " + path);
360       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
361       getDataSetWatchAsync();
362 
363       submitTask(path, slt.getMode(), currentVersion, this.report_period);
364 
365       // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
366       try {
367         int sleepTime = RandomUtils.nextInt(500) + 500;
368         Thread.sleep(sleepTime);
369       } catch (InterruptedException e) {
370         LOG.warn("Interrupted while yielding for other region servers", e);
371         Thread.currentThread().interrupt();
372       }
373     } finally {
374       synchronized (grabTaskLock) {
375         workerInGrabTask = false;
376         // clear the interrupt from stopTask() otherwise the next task will
377         // suffer
378         Thread.interrupted();
379       }
380     }
381   }
382 
383 
384   /**
385    * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
386    * <p>
387    * This method is also used to periodically heartbeat the task progress by transitioning the node
388    * from OWNED to OWNED.
389    * <p>
390    * @param isFirstTime
391    * @param zkw
392    * @param server
393    * @param task
394    * @param taskZKVersion
395    * @return non-negative integer value when task can be owned by current region server otherwise -1
396    */
397   protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
398       ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
399     int latestZKVersion = FAILED_TO_OWN_TASK;
400     try {
401       SplitLogTask slt = new SplitLogTask.Owned(server, mode);
402       Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
403       if (stat == null) {
404         LOG.warn("zk.setData() returned null for path " + task);
405         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
406         return FAILED_TO_OWN_TASK;
407       }
408       latestZKVersion = stat.getVersion();
409       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
410       return latestZKVersion;
411     } catch (KeeperException e) {
412       if (!isFirstTime) {
413         if (e.code().equals(KeeperException.Code.NONODE)) {
414           LOG.warn("NONODE failed to assert ownership for " + task, e);
415         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
416           LOG.warn("BADVERSION failed to assert ownership for " + task, e);
417         } else {
418           LOG.warn("failed to assert ownership for " + task, e);
419         }
420       }
421     } catch (InterruptedException e1) {
422       LOG.warn("Interrupted while trying to assert ownership of " +
423           task + " " + StringUtils.stringifyException(e1));
424       Thread.currentThread().interrupt();
425     }
426     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
427     return FAILED_TO_OWN_TASK;
428   }
429 
430   /**
431    * This function calculates how many splitters it could create based on expected average tasks per
432    * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
433    * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
434    * @param numTasks current total number of available tasks
435    * @return
436    */
437   private int calculateAvailableSplitters(int numTasks) {
438     // at lease one RS(itself) available
439     int availableRSs = 1;
440     try {
441       List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
442       availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
443     } catch (KeeperException e) {
444       // do nothing
445       LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
446     }
447 
448     int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
449     expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
450     // calculate how many more splitters we could spawn
451     return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
452   }
453 
454   /**
455    * Submit a log split task to executor service
456    * @param curTask
457    * @param curTaskZKVersion
458    */
459   void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion, 
460     final int reportPeriod) {
461     final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
462 
463     CancelableProgressable reporter = new CancelableProgressable() {
464       private long last_report_at = 0;
465 
466       @Override
467       public boolean progress() {
468         long t = EnvironmentEdgeManager.currentTimeMillis();
469         if ((t - last_report_at) > reportPeriod) {
470           last_report_at = t;
471           int latestZKVersion =
472               attemptToOwnTask(false, watcher, serverName, curTask, mode, zkVersion.intValue());
473           if (latestZKVersion < 0) {
474             LOG.warn("Failed to heartbeat the task" + curTask);
475             return false;
476           }
477           zkVersion.setValue(latestZKVersion);
478         }
479         return true;
480       }
481     };
482     
483     HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, 
484       this.tasksInProgress, this.splitTaskExecutor, mode);
485     this.executorService.submit(hsh);
486   }
487 
488   void getDataSetWatchAsync() {
489     this.watcher.getRecoverableZooKeeper().getZooKeeper().
490       getData(currentTask, this.watcher,
491       new GetDataAsyncCallback(), null);
492     SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
493   }
494 
495   void getDataSetWatchSuccess(String path, byte[] data) {
496     SplitLogTask slt;
497     try {
498       slt = SplitLogTask.parseFrom(data);
499     } catch (DeserializationException e) {
500       LOG.warn("Failed parse", e);
501       return;
502     }
503     synchronized (grabTaskLock) {
504       if (workerInGrabTask) {
505         // currentTask can change but that's ok
506         String taskpath = currentTask;
507         if (taskpath != null && taskpath.equals(path)) {
508           // have to compare data. cannot compare version because then there
509           // will be race with attemptToOwnTask()
510           // cannot just check whether the node has been transitioned to
511           // UNASSIGNED because by the time this worker sets the data watch
512           // the node might have made two transitions - from owned by this
513           // worker to unassigned to owned by another worker
514           if (! slt.isOwned(this.serverName) &&
515               ! slt.isDone(this.serverName) &&
516               ! slt.isErr(this.serverName) &&
517               ! slt.isResigned(this.serverName)) {
518             LOG.info("task " + taskpath + " preempted from " +
519                 serverName + ", current task state and owner=" + slt.toString());
520             stopTask();
521           }
522         }
523       }
524     }
525   }
526 
527   void getDataSetWatchFailure(String path) {
528     synchronized (grabTaskLock) {
529       if (workerInGrabTask) {
530         // currentTask can change but that's ok
531         String taskpath = currentTask;
532         if (taskpath != null && taskpath.equals(path)) {
533           LOG.info("retrying data watch on " + path);
534           SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
535           getDataSetWatchAsync();
536         } else {
537           // no point setting a watch on the task which this worker is not
538           // working upon anymore
539         }
540       }
541     }
542   }
543 
544   @Override
545   public void nodeDataChanged(String path) {
546     // there will be a self generated dataChanged event every time attemptToOwnTask()
547     // heartbeats the task znode by upping its version
548     synchronized (grabTaskLock) {
549       if (workerInGrabTask) {
550         // currentTask can change
551         String taskpath = currentTask;
552         if (taskpath!= null && taskpath.equals(path)) {
553           getDataSetWatchAsync();
554         }
555       }
556     }
557   }
558 
559 
560   private List<String> getTaskList() {
561     List<String> childrenPaths = null;
562     long sleepTime = 1000;
563     // It will be in loop till it gets the list of children or
564     // it will come out if worker thread exited.
565     while (!exitWorker) {
566       try {
567         childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
568             this.watcher.splitLogZNode);
569         if (childrenPaths != null) {
570           return childrenPaths;
571         }
572       } catch (KeeperException e) {
573         LOG.warn("Could not get children of znode "
574             + this.watcher.splitLogZNode, e);
575       }
576       try {
577         LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
578             + " after sleep for " + sleepTime + "ms!");
579         Thread.sleep(sleepTime);
580       } catch (InterruptedException e1) {
581         LOG.warn("Interrupted while trying to get task list ...", e1);
582         Thread.currentThread().interrupt();
583       }
584     }
585     return childrenPaths;
586   }
587 
588   @Override
589   public void nodeChildrenChanged(String path) {
590     if(path.equals(watcher.splitLogZNode)) {
591       LOG.debug("tasks arrived or departed");
592       synchronized (taskReadyLock) {
593         taskReadySeq++;
594         taskReadyLock.notify();
595       }
596     }
597   }
598 
599   /**
600    * If the worker is doing a task i.e. splitting a log file then stop the task.
601    * It doesn't exit the worker thread.
602    */
603   void stopTask() {
604     LOG.info("Sending interrupt to stop the worker thread");
605     worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
606   }
607 
608 
609   /**
610    * start the SplitLogWorker thread
611    */
612   public void start() {
613     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
614     exitWorker = false;
615     worker.start();
616   }
617 
618   /**
619    * stop the SplitLogWorker thread
620    */
621   public void stop() {
622     exitWorker = true;
623     stopTask();
624   }
625 
626   /**
627    * Asynchronous handler for zk get-data-set-watch on node results.
628    */
629   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
630     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
631 
632     @Override
633     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
634       SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
635       if (rc != 0) {
636         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
637         getDataSetWatchFailure(path);
638         return;
639       }
640       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
641       getDataSetWatchSuccess(path, data);
642     }
643   }
644 
645   /**
646    * Objects implementing this interface actually do the task that has been
647    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
648    * guarantee that two workers will not be executing the same task therefore it
649    * is better to have workers prepare the task and then have the
650    * {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher
651    */
652   public interface TaskExecutor {
653     enum Status {
654       DONE(),
655       ERR(),
656       RESIGNED(),
657       PREEMPTED()
658     }
659     Status exec(String name, RecoveryMode mode, CancelableProgressable p);
660   }
661 }