View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.regionserver.HRegionServer;
48  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationListener;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeers;
55  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
56  import org.apache.hadoop.hbase.replication.ReplicationQueues;
57  import org.apache.hadoop.hbase.replication.ReplicationTracker;
58  
59  import com.google.common.util.concurrent.ThreadFactoryBuilder;
60  
61  /**
62   * This class is responsible to manage all the replication
63   * sources. There are two classes of sources:
64   * <li> Normal sources are persistent and one per peer cluster</li>
65   * <li> Old sources are recovered from a failed region server and our
66   * only goal is to finish replicating the HLog queue it had up in ZK</li>
67   *
68   * When a region server dies, this class uses a watcher to get notified and it
69   * tries to grab a lock in order to transfer all the queues in a local
70   * old source.
71   *
72   * This class implements the ReplicationListener interface so that it can track changes in
73   * replication state.
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSourceManager implements ReplicationListener {
77    private static final Log LOG =
78        LogFactory.getLog(ReplicationSourceManager.class);
79    // List of all the sources that read this RS's logs
80    private final List<ReplicationSourceInterface> sources;
81    // List of all the sources we got from died RSs
82    private final List<ReplicationSourceInterface> oldsources;
83    private final ReplicationQueues replicationQueues;
84    private final ReplicationTracker replicationTracker;
85    private final ReplicationPeers replicationPeers;
86    // UUID for this cluster
87    private final UUID clusterId;
88    // All about stopping
89    private final Server server;
90    // All logs we are currently tracking
91    private final Map<String, SortedSet<String>> hlogsById;
92    // Logs for recovered sources we are currently tracking
93    private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
94    private final Configuration conf;
95    private final FileSystem fs;
96    // The path to the latest log we saw, for new coming sources
97    private Path latestPath;
98    // Path to the hlogs directories
99    private final Path logDir;
100   // Path to the hlog archive
101   private final Path oldLogDir;
102   // The number of ms that we wait before moving znodes, HBASE-3596
103   private final long sleepBeforeFailover;
104   // Homemade executer service for replication
105   private final ThreadPoolExecutor executor;
106 
107   private final Random rand;
108 
109 
110   /**
111    * Creates a replication manager and sets the watch on all the other registered region servers
112    * @param replicationQueues the interface for manipulating replication queues
113    * @param replicationPeers
114    * @param replicationTracker
115    * @param conf the configuration to use
116    * @param stopper the stopper object for this region server
117    * @param fs the file system to use
118    * @param logDir the directory that contains all hlog directories of live RSs
119    * @param oldLogDir the directory where old logs are archived
120    * @param clusterId
121    */
122   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
123       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
124       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
125       final Path oldLogDir, final UUID clusterId) {
126     //CopyOnWriteArrayList is thread-safe.
127     //Generally, reading is more than modifying.
128     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
129     this.replicationQueues = replicationQueues;
130     this.replicationPeers = replicationPeers;
131     this.replicationTracker = replicationTracker;
132     this.server = server;
133     this.hlogsById = new HashMap<String, SortedSet<String>>();
134     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
135     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
136     this.conf = conf;
137     this.fs = fs;
138     this.logDir = logDir;
139     this.oldLogDir = oldLogDir;
140     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
141     this.clusterId = clusterId;
142     this.replicationTracker.registerListener(this);
143     this.replicationPeers.getAllPeerIds();
144     // It's preferable to failover 1 RS at a time, but with good zk servers
145     // more could be processed at the same time.
146     int nbWorkers = conf.getInt("replication.executor.workers", 1);
147     // use a short 100ms sleep since this could be done inline with a RS startup
148     // even if we fail, other region servers can take care of it
149     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
150         100, TimeUnit.MILLISECONDS,
151         new LinkedBlockingQueue<Runnable>());
152     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
153     tfb.setNameFormat("ReplicationExecutor-%d");
154     tfb.setDaemon(true);
155     this.executor.setThreadFactory(tfb.build());
156     this.rand = new Random();
157   }
158 
159   /**
160    * Provide the id of the peer and a log key and this method will figure which
161    * hlog it belongs to and will log, for this region server, the current
162    * position. It will also clean old logs from the queue.
163    * @param log Path to the log currently being replicated from
164    * replication status in zookeeper. It will also delete older entries.
165    * @param id id of the peer cluster
166    * @param position current location in the log
167    * @param queueRecovered indicates if this queue comes from another region server
168    * @param holdLogInZK if true then the log is retained in ZK
169    */
170   public void logPositionAndCleanOldLogs(Path log, String id, long position,
171       boolean queueRecovered, boolean holdLogInZK) {
172     String fileName = log.getName();
173     this.replicationQueues.setLogPosition(id, fileName, position);
174     if (holdLogInZK) {
175      return;
176     }
177     cleanOldLogs(fileName, id, queueRecovered);
178   }
179 
180   /**
181    * Cleans a log file and all older files from ZK. Called when we are sure that a
182    * log file is closed and has no more entries.
183    * @param key Path to the log
184    * @param id id of the peer cluster
185    * @param queueRecovered Whether this is a recovered queue
186    */
187   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
188     if (queueRecovered) {
189       SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
190       if (hlogs != null && !hlogs.first().equals(key)) {
191         cleanOldLogs(hlogs, key, id);
192       }
193     } else {
194       synchronized (this.hlogsById) {
195         SortedSet<String> hlogs = hlogsById.get(id);
196         if (!hlogs.first().equals(key)) {
197           cleanOldLogs(hlogs, key, id);
198         }
199       }
200     }
201  }
202   
203   private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
204     SortedSet<String> hlogSet = hlogs.headSet(key);
205     LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
206     for (String hlog : hlogSet) {
207       this.replicationQueues.removeLog(id, hlog);
208     }
209     hlogSet.clear();
210   }
211 
212   /**
213    * Adds a normal source per registered peer cluster and tries to process all
214    * old region server hlog queues
215    */
216   protected void init() throws IOException, ReplicationException {
217     for (String id : this.replicationPeers.getPeerIds()) {
218       addSource(id);
219     }
220     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
221     if (currentReplicators == null || currentReplicators.size() == 0) {
222       return;
223     }
224     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
225     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
226         + otherRegionServers);
227 
228     // Look if there's anything to process after a restart
229     for (String rs : currentReplicators) {
230       if (!otherRegionServers.contains(rs)) {
231         transferQueues(rs);
232       }
233     }
234   }
235 
236   /**
237    * Add a new normal source to this region server
238    * @param id the id of the peer cluster
239    * @return the source that was created
240    * @throws IOException
241    */
242   protected ReplicationSourceInterface addSource(String id) throws IOException,
243       ReplicationException {
244     ReplicationPeerConfig peerConfig
245       = replicationPeers.getReplicationPeerConfig(id);
246     ReplicationPeer peer = replicationPeers.getPeer(id);
247     ReplicationSourceInterface src =
248         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
249           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
250     synchronized (this.hlogsById) {
251       this.sources.add(src);
252       this.hlogsById.put(id, new TreeSet<String>());
253       // Add the latest hlog to that source's queue
254       if (this.latestPath != null) {
255         String name = this.latestPath.getName();
256         this.hlogsById.get(id).add(name);
257         try {
258           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
259         } catch (ReplicationException e) {
260           String message =
261               "Cannot add log to queue when creating a new source, queueId="
262                   + src.getPeerClusterZnode() + ", filename=" + name;
263           server.stop(message);
264           throw e;
265         }
266         src.enqueueLog(this.latestPath);
267       }
268     }
269     src.startup();
270     return src;
271   }
272 
273   /**
274    * Delete a complete queue of hlogs associated with a peer cluster
275    * @param peerId Id of the peer cluster queue of hlogs to delete
276    */
277   public void deleteSource(String peerId, boolean closeConnection) {
278     this.replicationQueues.removeQueue(peerId);
279     if (closeConnection) {
280       this.replicationPeers.peerRemoved(peerId);
281     }
282   }
283 
284   /**
285    * Terminate the replication on this region server
286    */
287   public void join() {
288     this.executor.shutdown();
289     if (this.sources.size() == 0) {
290       this.replicationQueues.removeAllQueues();
291     }
292     for (ReplicationSourceInterface source : this.sources) {
293       source.terminate("Region server is closing");
294     }
295   }
296 
297   /**
298    * Get a copy of the hlogs of the first source on this rs
299    * @return a sorted set of hlog names
300    */
301   protected Map<String, SortedSet<String>> getHLogs() {
302     return Collections.unmodifiableMap(hlogsById);
303   }
304   
305   /**
306    * Get a copy of the hlogs of the recovered sources on this rs
307    * @return a sorted set of hlog names
308    */
309   protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
310     return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
311   }
312 
313   /**
314    * Get a list of all the normal sources of this rs
315    * @return lis of all sources
316    */
317   public List<ReplicationSourceInterface> getSources() {
318     return this.sources;
319   }
320 
321   /**
322    * Get a list of all the old sources of this rs
323    * @return list of all old sources
324    */
325   public List<ReplicationSourceInterface> getOldSources() {
326     return this.oldsources;
327   }
328 
329   void preLogRoll(Path newLog) throws IOException {
330     synchronized (this.hlogsById) {
331       String name = newLog.getName();
332       for (ReplicationSourceInterface source : this.sources) {
333         try {
334           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
335         } catch (ReplicationException e) {
336           throw new IOException("Cannot add log to replication queue with id="
337               + source.getPeerClusterZnode() + ", filename=" + name, e);
338         }
339       }
340       for (SortedSet<String> hlogs : this.hlogsById.values()) {
341         if (this.sources.isEmpty()) {
342           // If there's no slaves, don't need to keep the old hlogs since
343           // we only consider the last one when a new slave comes in
344           hlogs.clear();
345         }
346         hlogs.add(name);
347       }
348     }
349 
350     this.latestPath = newLog;
351   }
352 
353   void postLogRoll(Path newLog) throws IOException {
354     // This only updates the sources we own, not the recovered ones
355     for (ReplicationSourceInterface source : this.sources) {
356       source.enqueueLog(newLog);
357     }
358   }
359 
360   /**
361    * Factory method to create a replication source
362    * @param conf the configuration to use
363    * @param fs the file system to use
364    * @param manager the manager to use
365    * @param stopper the stopper object for this region server
366    * @param peerId the id of the peer cluster
367    * @return the created source
368    * @throws IOException
369    */
370   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
371       final FileSystem fs, final ReplicationSourceManager manager,
372       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
373       final Server server, final String peerId, final UUID clusterId,
374       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
375           throws IOException {
376     RegionServerCoprocessorHost rsServerHost = null;
377     if (server instanceof HRegionServer) {
378       rsServerHost = ((HRegionServer) server).getCoprocessorHost();
379     }
380     ReplicationSourceInterface src;
381     try {
382       @SuppressWarnings("rawtypes")
383       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
384           ReplicationSource.class.getCanonicalName()));
385       src = (ReplicationSourceInterface) c.newInstance();
386     } catch (Exception e) {
387       LOG.warn("Passed replication source implementation throws errors, " +
388           "defaulting to ReplicationSource", e);
389       src = new ReplicationSource();
390     }
391 
392     ReplicationEndpoint replicationEndpoint = null;
393     try {
394       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
395       if (replicationEndpointImpl == null) {
396         // Default to HBase inter-cluster replication endpoint
397         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
398       }
399       @SuppressWarnings("rawtypes")
400       Class c = Class.forName(replicationEndpointImpl);
401       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
402       if(rsServerHost != null) {
403         // We may have to use reflections here to see if the method is really there.
404         // If not do not go with the visibility replication, go with the normal one
405         ReplicationEndpoint newReplicationEndPoint = rsServerHost
406             .postCreateReplicationEndPoint(replicationEndpoint);
407         if(newReplicationEndPoint != null) {
408           // Override the newly created endpoint from the hook with configured end point
409           replicationEndpoint = newReplicationEndPoint;
410         }
411       }
412     } catch (Exception e) {
413       LOG.warn("Passed replication endpoint implementation throws errors", e);
414       throw new IOException(e);
415     }
416 
417     MetricsSource metrics = new MetricsSource(peerId);
418     // init replication source
419     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
420       clusterId, replicationEndpoint, metrics);
421 
422     // init replication endpoint
423     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
424       fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
425 
426     return src;
427   }
428 
429   /**
430    * Transfer all the queues of the specified to this region server.
431    * First it tries to grab a lock and if it works it will move the
432    * znodes and finally will delete the old znodes.
433    *
434    * It creates one old source for any type of source of the old rs.
435    * @param rsZnode
436    */
437   private void transferQueues(String rsZnode) {
438     NodeFailoverWorker transfer =
439         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
440             this.clusterId);
441     try {
442       this.executor.execute(transfer);
443     } catch (RejectedExecutionException ex) {
444       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
445     }
446   }
447 
448   /**
449    * Clear the references to the specified old source
450    * @param src source to clear
451    */
452   public void closeRecoveredQueue(ReplicationSourceInterface src) {
453     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
454     this.oldsources.remove(src);
455     deleteSource(src.getPeerClusterZnode(), false);
456     this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
457   }
458 
459   /**
460    * Thie method first deletes all the recovered sources for the specified
461    * id, then deletes the normal source (deleting all related data in ZK).
462    * @param id The id of the peer cluster
463    */
464   public void removePeer(String id) {
465     LOG.info("Closing the following queue " + id + ", currently have "
466         + sources.size() + " and another "
467         + oldsources.size() + " that were recovered");
468     String terminateMessage = "Replication stream was removed by a user";
469     ReplicationSourceInterface srcToRemove = null;
470     List<ReplicationSourceInterface> oldSourcesToDelete =
471         new ArrayList<ReplicationSourceInterface>();
472     // First close all the recovered sources for this peer
473     for (ReplicationSourceInterface src : oldsources) {
474       if (id.equals(src.getPeerClusterId())) {
475         oldSourcesToDelete.add(src);
476       }
477     }
478     for (ReplicationSourceInterface src : oldSourcesToDelete) {
479       src.terminate(terminateMessage);
480       closeRecoveredQueue((src));
481     }
482     LOG.info("Number of deleted recovered sources for " + id + ": "
483         + oldSourcesToDelete.size());
484     // Now look for the one on this cluster
485     for (ReplicationSourceInterface src : this.sources) {
486       if (id.equals(src.getPeerClusterId())) {
487         srcToRemove = src;
488         break;
489       }
490     }
491     if (srcToRemove == null) {
492       LOG.error("The queue we wanted to close is missing " + id);
493       return;
494     }
495     srcToRemove.terminate(terminateMessage);
496     this.sources.remove(srcToRemove);
497     deleteSource(id, true);
498   }
499 
500   @Override
501   public void regionServerRemoved(String regionserver) {
502     transferQueues(regionserver);
503   }
504 
505   @Override
506   public void peerRemoved(String peerId) {
507     removePeer(peerId);
508   }
509 
510   @Override
511   public void peerListChanged(List<String> peerIds) {
512     for (String id : peerIds) {
513       try {
514         boolean added = this.replicationPeers.peerAdded(id);
515         if (added) {
516           addSource(id);
517         }
518       } catch (Exception e) {
519         LOG.error("Error while adding a new peer", e);
520       }
521     }
522   }
523 
524   /**
525    * Class responsible to setup new ReplicationSources to take care of the
526    * queues from dead region servers.
527    */
528   class NodeFailoverWorker extends Thread {
529 
530     private String rsZnode;
531     private final ReplicationQueues rq;
532     private final ReplicationPeers rp;
533     private final UUID clusterId;
534 
535     /**
536      *
537      * @param rsZnode
538      */
539     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
540         final ReplicationPeers replicationPeers, final UUID clusterId) {
541       super("Failover-for-"+rsZnode);
542       this.rsZnode = rsZnode;
543       this.rq = replicationQueues;
544       this.rp = replicationPeers;
545       this.clusterId = clusterId;
546     }
547 
548     @Override
549     public void run() {
550       if (this.rq.isThisOurZnode(rsZnode)) {
551         return;
552       }
553       // Wait a bit before transferring the queues, we may be shutting down.
554       // This sleep may not be enough in some cases.
555       try {
556         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
557       } catch (InterruptedException e) {
558         LOG.warn("Interrupted while waiting before transferring a queue.");
559         Thread.currentThread().interrupt();
560       }
561       // We try to lock that rs' queue directory
562       if (server.isStopped()) {
563         LOG.info("Not transferring queue since we are shutting down");
564         return;
565       }
566       SortedMap<String, SortedSet<String>> newQueues = null;
567 
568       newQueues = this.rq.claimQueues(rsZnode);
569 
570       // Copying over the failed queue is completed.
571       if (newQueues.isEmpty()) {
572         // We either didn't get the lock or the failed region server didn't have any outstanding
573         // HLogs to replicate, so we are done.
574         return;
575       }
576 
577       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
578         String peerId = entry.getKey();
579         try {
580           // there is not an actual peer defined corresponding to peerId for the failover.
581           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
582           String actualPeerId = replicationQueueInfo.getPeerId();
583           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
584           ReplicationPeerConfig peerConfig = null;
585           try {
586             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
587           } catch (ReplicationException ex) {
588             LOG.warn("Received exception while getting replication peer config, skipping replay"
589                 + ex);
590           }
591           if (peer == null || peerConfig == null) {
592             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
593             continue;
594           }
595 
596           ReplicationSourceInterface src =
597               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
598                 server, peerId, this.clusterId, peerConfig, peer);
599           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
600             src.terminate("Recovered queue doesn't belong to any current peer");
601             break;
602           }
603           oldsources.add(src);
604           SortedSet<String> hlogsSet = entry.getValue();
605           for (String hlog : hlogsSet) {
606             src.enqueueLog(new Path(oldLogDir, hlog));
607           }
608           src.startup();
609           hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
610         } catch (IOException e) {
611           // TODO manage it
612           LOG.error("Failed creating a source", e);
613         }
614       }
615     }
616   }
617 
618   /**
619    * Get the directory where hlogs are archived
620    * @return the directory where hlogs are archived
621    */
622   public Path getOldLogDir() {
623     return this.oldLogDir;
624   }
625 
626   /**
627    * Get the directory where hlogs are stored by their RSs
628    * @return the directory where hlogs are stored by their RSs
629    */
630   public Path getLogDir() {
631     return this.logDir;
632   }
633 
634   /**
635    * Get the handle on the local file system
636    * @return Handle on the local file system
637    */
638   public FileSystem getFs() {
639     return this.fs;
640   }
641 
642   /**
643    * Get a string representation of all the sources' metrics
644    */
645   public String getStats() {
646     StringBuffer stats = new StringBuffer();
647     for (ReplicationSourceInterface source : sources) {
648       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
649       stats.append(source.getStats() + "\n");
650     }
651     for (ReplicationSourceInterface oldSource : oldsources) {
652       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
653       stats.append(oldSource.getStats()+ "\n");
654     }
655     return stats.toString();
656   }
657 }