View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.Stoppable;
42  import org.apache.hadoop.hbase.regionserver.wal.HLog;
43  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
44  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
46  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
47  import org.apache.hadoop.hbase.replication.ReplicationException;
48  import org.apache.hadoop.hbase.replication.ReplicationPeers;
49  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
50  import org.apache.hadoop.hbase.replication.ReplicationQueues;
51  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
52  import org.apache.hadoop.hbase.replication.WALEntryFilter;
53  import org.apache.hadoop.hbase.util.Threads;
54  import com.google.common.collect.Lists;
55  import com.google.common.util.concurrent.ListenableFuture;
56  import com.google.common.util.concurrent.Service;
57  
58  /**
59   * Class that handles the source of a replication stream.
60   * Currently does not handle more than 1 slave
61   * For each slave cluster it selects a random number of peers
62   * using a replication ratio. For example, if replication ration = 0.1
63   * and slave cluster has 100 region servers, 10 will be selected.
64   * <p/>
65   * A stream is considered down when we cannot contact a region server on the
66   * peer cluster for more than 55 seconds by default.
67   * <p/>
68   *
69   */
70  @InterfaceAudience.Private
71  public class ReplicationSource extends Thread
72      implements ReplicationSourceInterface {
73  
74    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
75    // Queue of logs to process
76    private PriorityBlockingQueue<Path> queue;
77    private ReplicationQueues replicationQueues;
78    private ReplicationPeers replicationPeers;
79  
80    private Configuration conf;
81    private ReplicationQueueInfo replicationQueueInfo;
82    // id of the peer cluster this source replicates to
83    private String peerId;
84    // The manager of all sources to which we ping back our progress
85    private ReplicationSourceManager manager;
86    // Should we stop everything?
87    private Stoppable stopper;
88    // How long should we sleep for each retry
89    private long sleepForRetries;
90    // Max size in bytes of entriesArray
91    private long replicationQueueSizeCapacity;
92    // Max number of entries in entriesArray
93    private int replicationQueueNbCapacity;
94    // Our reader for the current log
95    private HLog.Reader reader;
96    // Last position in the log that we sent to ZooKeeper
97    private long lastLoggedPosition = -1;
98    // Path of the current log
99    private volatile Path currentPath;
100   private FileSystem fs;
101   // id of this cluster
102   private UUID clusterId;
103   // id of the other cluster
104   private UUID peerClusterId;
105   // total number of edits we replicated
106   private long totalReplicatedEdits = 0;
107   // total number of edits we replicated
108   private long totalReplicatedOperations = 0;
109   // The znode we currently play with
110   private String peerClusterZnode;
111   // Maximum number of retries before taking bold actions
112   private int maxRetriesMultiplier;
113   // Current number of operations (Put/Delete) that we need to replicate
114   private int currentNbOperations = 0;
115   // Current size of data we need to replicate
116   private int currentSize = 0;
117   // Indicates if this particular source is running
118   private volatile boolean running = true;
119   // Metrics for this source
120   private MetricsSource metrics;
121   // Handle on the log reader helper
122   private ReplicationHLogReaderManager repLogReader;
123   //WARN threshold for the number of queued logs, defaults to 2
124   private int logQueueWarnThreshold;
125   // ReplicationEndpoint which will handle the actual replication
126   private ReplicationEndpoint replicationEndpoint;
127   // A filter (or a chain of filters) for the WAL entries.
128   private WALEntryFilter walEntryFilter;
129   // Context for ReplicationEndpoint#replicate()
130   private ReplicationEndpoint.ReplicateContext replicateContext;
131   // throttler
132   private ReplicationThrottler throttler;
133 
134   /**
135    * Instantiation method used by region servers
136    *
137    * @param conf configuration to use
138    * @param fs file system to use
139    * @param manager replication manager to ping to
140    * @param stopper     the atomic boolean to use to stop the regionserver
141    * @param peerClusterZnode the name of our znode
142    * @param clusterId unique UUID for the cluster
143    * @param replicationEndpoint the replication endpoint implementation
144    * @param metrics metrics for replication source
145    * @throws IOException
146    */
147   @Override
148   public void init(final Configuration conf, final FileSystem fs,
149       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
150       final ReplicationPeers replicationPeers, final Stoppable stopper,
151       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
152       final MetricsSource metrics)
153           throws IOException {
154     this.stopper = stopper;
155     this.conf = conf;
156     decorateConf();
157     this.replicationQueueSizeCapacity =
158         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
159     this.replicationQueueNbCapacity =
160         this.conf.getInt("replication.source.nb.capacity", 25000);
161     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
162     this.queue =
163         new PriorityBlockingQueue<Path>(
164             this.conf.getInt("hbase.regionserver.maxlogs", 32),
165             new LogsComparator());
166     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
167     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
168     this.replicationQueues = replicationQueues;
169     this.replicationPeers = replicationPeers;
170     this.manager = manager;
171     this.sleepForRetries =
172         this.conf.getLong("replication.source.sleepforretries", 1000);
173     this.fs = fs;
174     this.metrics = metrics;
175     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
176     this.clusterId = clusterId;
177 
178     this.peerClusterZnode = peerClusterZnode;
179     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
180     // ReplicationQueueInfo parses the peerId out of the znode for us
181     this.peerId = this.replicationQueueInfo.getPeerId();
182     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
183     this.replicationEndpoint = replicationEndpoint;
184 
185     this.replicateContext = new ReplicationEndpoint.ReplicateContext();
186   }
187 
188   private void decorateConf() {
189     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
190     if (StringUtils.isNotEmpty(replicationCodec)) {
191       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
192     }
193   }
194 
195   @Override
196   public void enqueueLog(Path log) {
197     this.queue.put(log);
198     int queueSize = queue.size();
199     this.metrics.setSizeOfLogQueue(queueSize);
200     // This will log a warning for each new log that gets created above the warn threshold
201     if (queueSize > this.logQueueWarnThreshold) {
202       LOG.warn("Queue size: " + queueSize +
203         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
204     }
205   }
206 
207   private void uninitialize() {
208     LOG.debug("Source exiting " + this.peerId);
209     metrics.clear();
210     if (replicationEndpoint.state() == Service.State.STARTING
211         || replicationEndpoint.state() == Service.State.RUNNING) {
212       replicationEndpoint.stopAndWait();
213     }
214   }
215 
216   @Override
217   public void run() {
218     // We were stopped while looping to connect to sinks, just abort
219     if (!this.isActive()) {
220       uninitialize();
221       return;
222     }
223 
224     try {
225       // start the endpoint, connect to the cluster
226       Service.State state = replicationEndpoint.start().get();
227       if (state != Service.State.RUNNING) {
228         LOG.warn("ReplicationEndpoint was not started. Exiting");
229         uninitialize();
230         return;
231       }
232     } catch (Exception ex) {
233       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
234       throw new RuntimeException(ex);
235     }
236 
237     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
238     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
239       (WALEntryFilter)new SystemTableWALEntryFilter());
240     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
241     if (filterFromEndpoint != null) {
242       filters.add(filterFromEndpoint);
243     }
244     this.walEntryFilter = new ChainWALEntryFilter(filters);
245 
246     int sleepMultiplier = 1;
247     // delay this until we are in an asynchronous thread
248     while (this.isActive() && this.peerClusterId == null) {
249       this.peerClusterId = replicationEndpoint.getPeerUUID();
250       if (this.isActive() && this.peerClusterId == null) {
251         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
252           sleepMultiplier++;
253         }
254       }
255     }
256     // We were stopped while looping to contact peer's zk ensemble, just abort
257     if (!this.isActive()) {
258       uninitialize();
259       return;
260     }
261 
262     // resetting to 1 to reuse later
263     sleepMultiplier = 1;
264 
265     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
266     // peerClusterId value, which is the same as the source clusterId
267     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
268       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
269           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
270           + replicationEndpoint.getClass().getName(), null, false);
271     }
272     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
273 
274     // If this is recovered, the queue is already full and the first log
275     // normally has a position (unless the RS failed between 2 logs)
276     if (this.replicationQueueInfo.isQueueRecovered()) {
277       try {
278         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
279           this.queue.peek().getName()));
280         if (LOG.isTraceEnabled()) {
281           LOG.trace("Recovered queue started with log " + this.queue.peek() +
282               " at position " + this.repLogReader.getPosition());
283         }
284       } catch (ReplicationException e) {
285         this.terminate("Couldn't get the position of this recovered queue " +
286             this.peerClusterZnode, e);
287       }
288     }
289     // Loop until we close down
290     while (isActive()) {
291       // Sleep until replication is enabled again
292       if (!isPeerEnabled()) {
293         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
294           sleepMultiplier++;
295         }
296         continue;
297       }
298       Path oldPath = getCurrentPath(); //note that in the current scenario,
299                                        //oldPath will be null when a log roll
300                                        //happens.
301       // Get a new path
302       boolean hasCurrentPath = getNextPath();
303       if (getCurrentPath() != null && oldPath == null) {
304         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
305       }
306       if (!hasCurrentPath) {
307         if (sleepForRetries("No log to process", sleepMultiplier)) {
308           sleepMultiplier++;
309         }
310         continue;
311       }
312       boolean currentWALisBeingWrittenTo = false;
313       //For WAL files we own (rather than recovered), take a snapshot of whether the
314       //current WAL file (this.currentPath) is in use (for writing) NOW!
315       //Since the new WAL paths are enqueued only after the prev WAL file
316       //is 'closed', presence of an element in the queue means that
317       //the previous WAL file was closed, else the file is in use (currentPath)
318       //We take the snapshot now so that we are protected against races
319       //where a new file gets enqueued while the current file is being processed
320       //(and where we just finished reading the current file).
321       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
322         currentWALisBeingWrittenTo = true;
323       }
324       // Open a reader on it
325       if (!openReader(sleepMultiplier)) {
326         // Reset the sleep multiplier, else it'd be reused for the next file
327         sleepMultiplier = 1;
328         continue;
329       }
330 
331       // If we got a null reader but didn't continue, then sleep and continue
332       if (this.reader == null) {
333         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
334           sleepMultiplier++;
335         }
336         continue;
337       }
338 
339       boolean gotIOE = false;
340       currentNbOperations = 0;
341       List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
342       currentSize = 0;
343       try {
344         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
345           continue;
346         }
347       } catch (IOException ioe) {
348         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
349         gotIOE = true;
350         if (ioe.getCause() instanceof EOFException) {
351 
352           boolean considerDumping = false;
353           if (this.replicationQueueInfo.isQueueRecovered()) {
354             try {
355               FileStatus stat = this.fs.getFileStatus(this.currentPath);
356               if (stat.getLen() == 0) {
357                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
358               }
359               considerDumping = true;
360             } catch (IOException e) {
361               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
362             }
363           }
364 
365           if (considerDumping &&
366               sleepMultiplier == this.maxRetriesMultiplier &&
367               processEndOfFile()) {
368             continue;
369           }
370         }
371       } finally {
372         try {
373           this.reader = null;
374           this.repLogReader.closeReader();
375         } catch (IOException e) {
376           gotIOE = true;
377           LOG.warn("Unable to finalize the tailing of a file", e);
378         }
379       }
380 
381       // If we didn't get anything to replicate, or if we hit a IOE,
382       // wait a bit and retry.
383       // But if we need to stop, don't bother sleeping
384       if (this.isActive() && (gotIOE || entries.isEmpty())) {
385         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
386           this.manager.logPositionAndCleanOldLogs(this.currentPath,
387               this.peerClusterZnode, this.repLogReader.getPosition(),
388               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
389           this.lastLoggedPosition = this.repLogReader.getPosition();
390         }
391         // Reset the sleep multiplier if nothing has actually gone wrong
392         if (!gotIOE) {
393           sleepMultiplier = 1;
394           // if there was nothing to ship and it's not an error
395           // set "ageOfLastShippedOp" to <now> to indicate that we're current
396           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
397         }
398         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
399           sleepMultiplier++;
400         }
401         continue;
402       }
403       sleepMultiplier = 1;
404       shipEdits(currentWALisBeingWrittenTo, entries);
405     }
406     uninitialize();
407   }
408 
409   /**
410    * Read all the entries from the current log files and retain those
411    * that need to be replicated. Else, process the end of the current file.
412    * @param currentWALisBeingWrittenTo is the current WAL being written to
413    * @param entries resulting entries to be replicated
414    * @return true if we got nothing and went to the next file, false if we got
415    * entries
416    * @throws IOException
417    */
418   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
419       List<HLog.Entry> entries) throws IOException{
420     long seenEntries = 0;
421     if (LOG.isTraceEnabled()) {
422       LOG.trace("Seeking in " + this.currentPath + " at position "
423           + this.repLogReader.getPosition());
424     }
425     this.repLogReader.seek();
426     long positionBeforeRead = this.repLogReader.getPosition();
427     HLog.Entry entry =
428         this.repLogReader.readNextAndSetPosition();
429     while (entry != null) {
430       this.metrics.incrLogEditsRead();
431       seenEntries++;
432 
433       // don't replicate if the log entries have already been consumed by the cluster
434       if (replicationEndpoint.canReplicateToSameCluster()
435           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
436         // Remove all KVs that should not be replicated
437         entry = walEntryFilter.filter(entry);
438         WALEdit edit = null;
439         HLogKey logKey = null;
440         if (entry != null) {
441           edit = entry.getEdit();
442           logKey = entry.getKey();
443         }
444 
445         if (edit != null && edit.size() != 0) {
446           //Mark that the current cluster has the change
447           logKey.addClusterId(clusterId);
448           currentNbOperations += countDistinctRowKeys(edit);
449           entries.add(entry);
450           currentSize += entry.getEdit().heapSize();
451         } else {
452           this.metrics.incrLogEditsFiltered();
453         }
454       }
455       // Stop if too many entries or too big
456       if (currentSize >= this.replicationQueueSizeCapacity ||
457           entries.size() >= this.replicationQueueNbCapacity) {
458         break;
459       }
460       try {
461         entry = this.repLogReader.readNextAndSetPosition();
462       } catch (IOException ie) {
463         LOG.debug("Break on IOE: " + ie.getMessage());
464         break;
465       }
466     }
467     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
468     if (currentWALisBeingWrittenTo) {
469       return false;
470     }
471     // If we didn't get anything and the queue has an object, it means we
472     // hit the end of the file for sure
473     return seenEntries == 0 && processEndOfFile();
474   }
475 
476   /**
477    * Poll for the next path
478    * @return true if a path was obtained, false if not
479    */
480   protected boolean getNextPath() {
481     try {
482       if (this.currentPath == null) {
483         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
484         this.metrics.setSizeOfLogQueue(queue.size());
485         if (this.currentPath != null) {
486           this.manager.cleanOldLogs(this.currentPath.getName(),
487               this.peerId,
488               this.replicationQueueInfo.isQueueRecovered());
489           if (LOG.isTraceEnabled()) {
490             LOG.trace("New log: " + this.currentPath);
491           }
492         }
493       }
494     } catch (InterruptedException e) {
495       LOG.warn("Interrupted while reading edits", e);
496     }
497     return this.currentPath != null;
498   }
499 
500   /**
501    * Open a reader on the current path
502    *
503    * @param sleepMultiplier by how many times the default sleeping time is augmented
504    * @return true if we should continue with that file, false if we are over with it
505    */
506   protected boolean openReader(int sleepMultiplier) {
507     try {
508       try {
509         if (LOG.isTraceEnabled()) {
510           LOG.trace("Opening log " + this.currentPath);
511         }
512         this.reader = repLogReader.openReader(this.currentPath);
513       } catch (FileNotFoundException fnfe) {
514         if (this.replicationQueueInfo.isQueueRecovered()) {
515           // We didn't find the log in the archive directory, look if it still
516           // exists in the dead RS folder (there could be a chain of failures
517           // to look at)
518           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
519           LOG.info("NB dead servers : " + deadRegionServers.size());
520           for (String curDeadServerName : deadRegionServers) {
521             Path deadRsDirectory =
522                 new Path(manager.getLogDir().getParent(), curDeadServerName);
523             Path[] locs = new Path[] {
524                 new Path(deadRsDirectory, currentPath.getName()),
525                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
526                                           currentPath.getName()),
527             };
528             for (Path possibleLogLocation : locs) {
529               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
530               if (this.manager.getFs().exists(possibleLogLocation)) {
531                 // We found the right new location
532                 LOG.info("Log " + this.currentPath + " still exists at " +
533                     possibleLogLocation);
534                 // Breaking here will make us sleep since reader is null
535                 return true;
536               }
537             }
538           }
539           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
540           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
541           if (stopper instanceof ReplicationSyncUp.DummyServer) {
542             FileStatus[] rss = fs.listStatus(manager.getLogDir());
543             for (FileStatus rs : rss) {
544               Path p = rs.getPath();
545               FileStatus[] logs = fs.listStatus(p);
546               for (FileStatus log : logs) {
547                 p = new Path(p, log.getPath().getName());
548                 if (p.getName().equals(currentPath.getName())) {
549                   currentPath = p;
550                   LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
551                   // Open the log at the new location
552                   this.openReader(sleepMultiplier);
553                   return true;
554                 }
555               }
556             }
557           }
558 
559           // TODO What happens if the log was missing from every single location?
560           // Although we need to check a couple of times as the log could have
561           // been moved by the master between the checks
562           // It can also happen if a recovered queue wasn't properly cleaned,
563           // such that the znode pointing to a log exists but the log was
564           // deleted a long time ago.
565           // For the moment, we'll throw the IO and processEndOfFile
566           throw new IOException("File from recovered queue is " +
567               "nowhere to be found", fnfe);
568         } else {
569           // If the log was archived, continue reading from there
570           Path archivedLogLocation =
571               new Path(manager.getOldLogDir(), currentPath.getName());
572           if (this.manager.getFs().exists(archivedLogLocation)) {
573             currentPath = archivedLogLocation;
574             LOG.info("Log " + this.currentPath + " was moved to " +
575                 archivedLogLocation);
576             // Open the log at the new location
577             this.openReader(sleepMultiplier);
578 
579           }
580           // TODO What happens the log is missing in both places?
581         }
582       }
583     } catch (IOException ioe) {
584       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
585       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
586       this.reader = null;
587       if (ioe.getCause() instanceof NullPointerException) {
588         // Workaround for race condition in HDFS-4380
589         // which throws a NPE if we open a file before any data node has the most recent block
590         // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
591         LOG.warn("Got NPE opening reader, will retry.");
592       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
593         // TODO Need a better way to determine if a file is really gone but
594         // TODO without scanning all logs dir
595         LOG.warn("Waited too long for this file, considering dumping");
596         return !processEndOfFile();
597       }
598     }
599     return true;
600   }
601 
602   /*
603    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
604    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
605    * trying to read the log file and get EOFException. In case of a recovered queue the last log
606    * file may be empty, and we don't want to retry that.
607    */
608   private boolean isCurrentLogEmpty() {
609     return (this.repLogReader.getPosition() == 0 &&
610         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
611   }
612 
613   /**
614    * Do the sleeping logic
615    * @param msg Why we sleep
616    * @param sleepMultiplier by how many times the default sleeping time is augmented
617    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
618    */
619   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
620     try {
621       if (LOG.isTraceEnabled()) {
622         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
623       }
624       Thread.sleep(this.sleepForRetries * sleepMultiplier);
625     } catch (InterruptedException e) {
626       LOG.debug("Interrupted while sleeping between retries");
627       Thread.currentThread().interrupt();
628     }
629     return sleepMultiplier < maxRetriesMultiplier;
630   }
631 
632   /**
633    * Count the number of different row keys in the given edit because of
634    * mini-batching. We assume that there's at least one KV in the WALEdit.
635    * @param edit edit to count row keys from
636    * @return number of different row keys
637    */
638   private int countDistinctRowKeys(WALEdit edit) {
639     List<KeyValue> kvs = edit.getKeyValues();
640     int distinctRowKeys = 1;
641     KeyValue lastKV = kvs.get(0);
642     for (int i = 0; i < edit.size(); i++) {
643       if (!kvs.get(i).matchingRow(lastKV)) {
644         distinctRowKeys++;
645       }
646     }
647     return distinctRowKeys;
648   }
649 
650   /**
651    * Do the shipping logic
652    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
653    * written to when this method was called
654    */
655   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
656     int sleepMultiplier = 1;
657     if (entries.isEmpty()) {
658       LOG.warn("Was given 0 edits to ship");
659       return;
660     }
661     while (this.isActive()) {
662       try {
663         if (this.throttler.isEnabled()) {
664           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
665           if (sleepTicks > 0) {
666             try {
667               if (LOG.isTraceEnabled()) {
668                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
669               }
670               Thread.sleep(sleepTicks);
671             } catch (InterruptedException e) {
672               LOG.debug("Interrupted while sleeping for throttling control");
673               Thread.currentThread().interrupt();
674               // current thread might be interrupted to terminate
675               // directly go back to while() for confirm this
676               continue;
677             }
678             // reset throttler's cycle start tick when sleep for throttling occurs
679             this.throttler.resetStartTick();
680           }
681         }
682         replicateContext.setEntries(entries).setSize(currentSize);
683 
684         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
685         boolean replicated = replicationEndpoint.replicate(replicateContext);
686 
687         if (!replicated) {
688           continue;
689         }
690 
691         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
692           this.manager.logPositionAndCleanOldLogs(this.currentPath,
693               this.peerClusterZnode, this.repLogReader.getPosition(),
694               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
695           this.lastLoggedPosition = this.repLogReader.getPosition();
696         }
697         if (this.throttler.isEnabled()) {
698           this.throttler.addPushSize(currentSize);
699         }
700         this.totalReplicatedEdits += entries.size();
701         this.totalReplicatedOperations += currentNbOperations;
702         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
703         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
704         if (LOG.isTraceEnabled()) {
705           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
706               + this.totalReplicatedOperations + " operations");
707         }
708         break;
709       } catch (Exception ex) {
710         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
711         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
712           sleepMultiplier++;
713         }
714       }
715     }
716   }
717 
718   /**
719    * check whether the peer is enabled or not
720    *
721    * @return true if the peer is enabled, otherwise false
722    */
723   protected boolean isPeerEnabled() {
724     return this.replicationPeers.getStatusOfPeer(this.peerId);
725   }
726 
727   /**
728    * If the queue isn't empty, switch to the next one
729    * Else if this is a recovered queue, it means we're done!
730    * Else we'll just continue to try reading the log file
731    * @return true if we're done with the current file, false if we should
732    * continue trying to read from it
733    */
734   protected boolean processEndOfFile() {
735     if (this.queue.size() != 0) {
736       if (LOG.isTraceEnabled()) {
737         String filesize = "N/A";
738         try {
739           FileStatus stat = this.fs.getFileStatus(this.currentPath);
740           filesize = stat.getLen()+"";
741         } catch (IOException ex) {}
742         LOG.trace("Reached the end of a log, stats: " + getStats() +
743             ", and the length of the file is " + filesize);
744       }
745       this.currentPath = null;
746       this.repLogReader.finishCurrentFile();
747       this.reader = null;
748       return true;
749     } else if (this.replicationQueueInfo.isQueueRecovered()) {
750       this.manager.closeRecoveredQueue(this);
751       LOG.info("Finished recovering the queue with the following stats " + getStats());
752       this.running = false;
753       return true;
754     }
755     return false;
756   }
757 
758   @Override
759   public void startup() {
760     String n = Thread.currentThread().getName();
761     Thread.UncaughtExceptionHandler handler =
762         new Thread.UncaughtExceptionHandler() {
763           @Override
764           public void uncaughtException(final Thread t, final Throwable e) {
765             LOG.error("Unexpected exception in ReplicationSource," +
766               " currentPath=" + currentPath, e);
767           }
768         };
769     Threads.setDaemonThreadRunning(
770         this, n + ".replicationSource," +
771         this.peerClusterZnode, handler);
772   }
773 
774   @Override
775   public void terminate(String reason) {
776     terminate(reason, null);
777   }
778 
779   @Override
780   public void terminate(String reason, Exception cause) {
781     terminate(reason, cause, true);
782   }
783 
784   public void terminate(String reason, Exception cause, boolean join) {
785     if (cause == null) {
786       LOG.info("Closing source "
787           + this.peerClusterZnode + " because: " + reason);
788 
789     } else {
790       LOG.error("Closing source " + this.peerClusterZnode
791           + " because an error occurred: " + reason, cause);
792     }
793     this.running = false;
794     this.interrupt();
795     ListenableFuture<Service.State> future = null;
796     if (this.replicationEndpoint != null) {
797       future = this.replicationEndpoint.stop();
798     }
799     if (join) {
800       Threads.shutdown(this, this.sleepForRetries);
801       if (future != null) {
802         try {
803           future.get();
804         } catch (Exception e) {
805           LOG.warn("Got exception:" + e);
806         }
807       }
808     }
809   }
810 
811   @Override
812   public String getPeerClusterZnode() {
813     return this.peerClusterZnode;
814   }
815 
816   @Override
817   public String getPeerClusterId() {
818     return this.peerId;
819   }
820 
821   @Override
822   public Path getCurrentPath() {
823     return this.currentPath;
824   }
825 
826   private boolean isActive() {
827     return !this.stopper.isStopped() && this.running && !isInterrupted();
828   }
829 
830   /**
831    * Comparator used to compare logs together based on their start time
832    */
833   public static class LogsComparator implements Comparator<Path> {
834 
835     @Override
836     public int compare(Path o1, Path o2) {
837       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
838     }
839 
840     /**
841      * Split a path to get the start time
842      * For example: 10.20.20.171%3A60020.1277499063250
843      * @param p path to split
844      * @return start time
845      */
846     private long getTS(Path p) {
847       String[] parts = p.getName().split("\\.");
848       return Long.parseLong(parts[parts.length-1]);
849     }
850   }
851 
852   @Override
853   public String getStats() {
854     long position = this.repLogReader.getPosition();
855     return "Total replicated edits: " + totalReplicatedEdits +
856       ", currently replicating from: " + this.currentPath +
857       " at position: " + position;
858   }
859 
860   /**
861    * Get Replication Source Metrics
862    * @return sourceMetrics
863    */
864   public MetricsSource getSourceMetrics() {
865     return this.metrics;
866   }
867 }