1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.SortedMap;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.CopyOnWriteArrayList;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.classification.InterfaceAudience;
47 import org.apache.hadoop.hbase.regionserver.HRegionServer;
48 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
49 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50 import org.apache.hadoop.hbase.replication.ReplicationException;
51 import org.apache.hadoop.hbase.replication.ReplicationListener;
52 import org.apache.hadoop.hbase.replication.ReplicationPeer;
53 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54 import org.apache.hadoop.hbase.replication.ReplicationPeers;
55 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
56 import org.apache.hadoop.hbase.replication.ReplicationQueues;
57 import org.apache.hadoop.hbase.replication.ReplicationTracker;
58
59 import com.google.common.util.concurrent.ThreadFactoryBuilder;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Private
76 public class ReplicationSourceManager implements ReplicationListener {
77 private static final Log LOG =
78 LogFactory.getLog(ReplicationSourceManager.class);
79
80 private final List<ReplicationSourceInterface> sources;
81
82 private final List<ReplicationSourceInterface> oldsources;
83 private final ReplicationQueues replicationQueues;
84 private final ReplicationTracker replicationTracker;
85 private final ReplicationPeers replicationPeers;
86
87 private final UUID clusterId;
88
89 private final Server server;
90
91 private final Map<String, SortedSet<String>> hlogsById;
92
93 private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
94 private final Configuration conf;
95 private final FileSystem fs;
96
97 private Path latestPath;
98
99 private final Path logDir;
100
101 private final Path oldLogDir;
102
103 private final long sleepBeforeFailover;
104
105 private final ThreadPoolExecutor executor;
106
107 private final Random rand;
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public ReplicationSourceManager(final ReplicationQueues replicationQueues,
123 final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
124 final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
125 final Path oldLogDir, final UUID clusterId) {
126
127
128 this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
129 this.replicationQueues = replicationQueues;
130 this.replicationPeers = replicationPeers;
131 this.replicationTracker = replicationTracker;
132 this.server = server;
133 this.hlogsById = new HashMap<String, SortedSet<String>>();
134 this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
135 this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
136 this.conf = conf;
137 this.fs = fs;
138 this.logDir = logDir;
139 this.oldLogDir = oldLogDir;
140 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
141 this.clusterId = clusterId;
142 this.replicationTracker.registerListener(this);
143 this.replicationPeers.getAllPeerIds();
144
145
146 int nbWorkers = conf.getInt("replication.executor.workers", 1);
147
148
149 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
150 100, TimeUnit.MILLISECONDS,
151 new LinkedBlockingQueue<Runnable>());
152 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
153 tfb.setNameFormat("ReplicationExecutor-%d");
154 tfb.setDaemon(true);
155 this.executor.setThreadFactory(tfb.build());
156 this.rand = new Random();
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170 public void logPositionAndCleanOldLogs(Path log, String id, long position,
171 boolean queueRecovered, boolean holdLogInZK) {
172 String fileName = log.getName();
173 this.replicationQueues.setLogPosition(id, fileName, position);
174 if (holdLogInZK) {
175 return;
176 }
177 cleanOldLogs(fileName, id, queueRecovered);
178 }
179
180
181
182
183
184
185
186
187 public void cleanOldLogs(String key, String id, boolean queueRecovered) {
188 if (queueRecovered) {
189 SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
190 if (hlogs != null && !hlogs.first().equals(key)) {
191 cleanOldLogs(hlogs, key, id);
192 }
193 } else {
194 synchronized (this.hlogsById) {
195 SortedSet<String> hlogs = hlogsById.get(id);
196 if (!hlogs.first().equals(key)) {
197 cleanOldLogs(hlogs, key, id);
198 }
199 }
200 }
201 }
202
203 private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
204 SortedSet<String> hlogSet = hlogs.headSet(key);
205 LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
206 for (String hlog : hlogSet) {
207 this.replicationQueues.removeLog(id, hlog);
208 }
209 hlogSet.clear();
210 }
211
212
213
214
215
216 protected void init() throws IOException, ReplicationException {
217 for (String id : this.replicationPeers.getPeerIds()) {
218 addSource(id);
219 }
220 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
221 if (currentReplicators == null || currentReplicators.size() == 0) {
222 return;
223 }
224 List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
225 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
226 + otherRegionServers);
227
228
229 for (String rs : currentReplicators) {
230 if (!otherRegionServers.contains(rs)) {
231 transferQueues(rs);
232 }
233 }
234 }
235
236
237
238
239
240
241
242 protected ReplicationSourceInterface addSource(String id) throws IOException,
243 ReplicationException {
244 ReplicationPeerConfig peerConfig
245 = replicationPeers.getReplicationPeerConfig(id);
246 ReplicationPeer peer = replicationPeers.getPeer(id);
247 ReplicationSourceInterface src =
248 getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
249 this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
250 synchronized (this.hlogsById) {
251 this.sources.add(src);
252 this.hlogsById.put(id, new TreeSet<String>());
253
254 if (this.latestPath != null) {
255 String name = this.latestPath.getName();
256 this.hlogsById.get(id).add(name);
257 try {
258 this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
259 } catch (ReplicationException e) {
260 String message =
261 "Cannot add log to queue when creating a new source, queueId="
262 + src.getPeerClusterZnode() + ", filename=" + name;
263 server.stop(message);
264 throw e;
265 }
266 src.enqueueLog(this.latestPath);
267 }
268 }
269 src.startup();
270 return src;
271 }
272
273
274
275
276
277 public void deleteSource(String peerId, boolean closeConnection) {
278 this.replicationQueues.removeQueue(peerId);
279 if (closeConnection) {
280 this.replicationPeers.peerRemoved(peerId);
281 }
282 }
283
284
285
286
287 public void join() {
288 this.executor.shutdown();
289 if (this.sources.size() == 0) {
290 this.replicationQueues.removeAllQueues();
291 }
292 for (ReplicationSourceInterface source : this.sources) {
293 source.terminate("Region server is closing");
294 }
295 }
296
297
298
299
300
301 protected Map<String, SortedSet<String>> getHLogs() {
302 return Collections.unmodifiableMap(hlogsById);
303 }
304
305
306
307
308
309 protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
310 return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
311 }
312
313
314
315
316
317 public List<ReplicationSourceInterface> getSources() {
318 return this.sources;
319 }
320
321
322
323
324
325 public List<ReplicationSourceInterface> getOldSources() {
326 return this.oldsources;
327 }
328
329 void preLogRoll(Path newLog) throws IOException {
330 synchronized (this.hlogsById) {
331 String name = newLog.getName();
332 for (ReplicationSourceInterface source : this.sources) {
333 try {
334 this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
335 } catch (ReplicationException e) {
336 throw new IOException("Cannot add log to replication queue with id="
337 + source.getPeerClusterZnode() + ", filename=" + name, e);
338 }
339 }
340 for (SortedSet<String> hlogs : this.hlogsById.values()) {
341 if (this.sources.isEmpty()) {
342
343
344 hlogs.clear();
345 }
346 hlogs.add(name);
347 }
348 }
349
350 this.latestPath = newLog;
351 }
352
353 void postLogRoll(Path newLog) throws IOException {
354
355 for (ReplicationSourceInterface source : this.sources) {
356 source.enqueueLog(newLog);
357 }
358 }
359
360
361
362
363
364
365
366
367
368
369
370 protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
371 final FileSystem fs, final ReplicationSourceManager manager,
372 final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
373 final Server server, final String peerId, final UUID clusterId,
374 final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
375 throws IOException {
376 RegionServerCoprocessorHost rsServerHost = null;
377 if (server instanceof HRegionServer) {
378 rsServerHost = ((HRegionServer) server).getCoprocessorHost();
379 }
380 ReplicationSourceInterface src;
381 try {
382 @SuppressWarnings("rawtypes")
383 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
384 ReplicationSource.class.getCanonicalName()));
385 src = (ReplicationSourceInterface) c.newInstance();
386 } catch (Exception e) {
387 LOG.warn("Passed replication source implementation throws errors, " +
388 "defaulting to ReplicationSource", e);
389 src = new ReplicationSource();
390 }
391
392 ReplicationEndpoint replicationEndpoint = null;
393 try {
394 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
395 if (replicationEndpointImpl == null) {
396
397 replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
398 }
399 @SuppressWarnings("rawtypes")
400 Class c = Class.forName(replicationEndpointImpl);
401 replicationEndpoint = (ReplicationEndpoint) c.newInstance();
402 if(rsServerHost != null) {
403
404
405 ReplicationEndpoint newReplicationEndPoint = rsServerHost
406 .postCreateReplicationEndPoint(replicationEndpoint);
407 if(newReplicationEndPoint != null) {
408
409 replicationEndpoint = newReplicationEndPoint;
410 }
411 }
412 } catch (Exception e) {
413 LOG.warn("Passed replication endpoint implementation throws errors", e);
414 throw new IOException(e);
415 }
416
417 MetricsSource metrics = new MetricsSource(peerId);
418
419 src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
420 clusterId, replicationEndpoint, metrics);
421
422
423 replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
424 fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
425
426 return src;
427 }
428
429
430
431
432
433
434
435
436
437 private void transferQueues(String rsZnode) {
438 NodeFailoverWorker transfer =
439 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
440 this.clusterId);
441 try {
442 this.executor.execute(transfer);
443 } catch (RejectedExecutionException ex) {
444 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
445 }
446 }
447
448
449
450
451
452 public void closeRecoveredQueue(ReplicationSourceInterface src) {
453 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
454 this.oldsources.remove(src);
455 deleteSource(src.getPeerClusterZnode(), false);
456 this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
457 }
458
459
460
461
462
463
464 public void removePeer(String id) {
465 LOG.info("Closing the following queue " + id + ", currently have "
466 + sources.size() + " and another "
467 + oldsources.size() + " that were recovered");
468 String terminateMessage = "Replication stream was removed by a user";
469 ReplicationSourceInterface srcToRemove = null;
470 List<ReplicationSourceInterface> oldSourcesToDelete =
471 new ArrayList<ReplicationSourceInterface>();
472
473 for (ReplicationSourceInterface src : oldsources) {
474 if (id.equals(src.getPeerClusterId())) {
475 oldSourcesToDelete.add(src);
476 }
477 }
478 for (ReplicationSourceInterface src : oldSourcesToDelete) {
479 src.terminate(terminateMessage);
480 closeRecoveredQueue((src));
481 }
482 LOG.info("Number of deleted recovered sources for " + id + ": "
483 + oldSourcesToDelete.size());
484
485 for (ReplicationSourceInterface src : this.sources) {
486 if (id.equals(src.getPeerClusterId())) {
487 srcToRemove = src;
488 break;
489 }
490 }
491 if (srcToRemove == null) {
492 LOG.error("The queue we wanted to close is missing " + id);
493 return;
494 }
495 srcToRemove.terminate(terminateMessage);
496 this.sources.remove(srcToRemove);
497 deleteSource(id, true);
498 }
499
500 @Override
501 public void regionServerRemoved(String regionserver) {
502 transferQueues(regionserver);
503 }
504
505 @Override
506 public void peerRemoved(String peerId) {
507 removePeer(peerId);
508 }
509
510 @Override
511 public void peerListChanged(List<String> peerIds) {
512 for (String id : peerIds) {
513 try {
514 boolean added = this.replicationPeers.peerAdded(id);
515 if (added) {
516 addSource(id);
517 }
518 } catch (Exception e) {
519 LOG.error("Error while adding a new peer", e);
520 }
521 }
522 }
523
524
525
526
527
528 class NodeFailoverWorker extends Thread {
529
530 private String rsZnode;
531 private final ReplicationQueues rq;
532 private final ReplicationPeers rp;
533 private final UUID clusterId;
534
535
536
537
538
539 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
540 final ReplicationPeers replicationPeers, final UUID clusterId) {
541 super("Failover-for-"+rsZnode);
542 this.rsZnode = rsZnode;
543 this.rq = replicationQueues;
544 this.rp = replicationPeers;
545 this.clusterId = clusterId;
546 }
547
548 @Override
549 public void run() {
550 if (this.rq.isThisOurZnode(rsZnode)) {
551 return;
552 }
553
554
555 try {
556 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
557 } catch (InterruptedException e) {
558 LOG.warn("Interrupted while waiting before transferring a queue.");
559 Thread.currentThread().interrupt();
560 }
561
562 if (server.isStopped()) {
563 LOG.info("Not transferring queue since we are shutting down");
564 return;
565 }
566 SortedMap<String, SortedSet<String>> newQueues = null;
567
568 newQueues = this.rq.claimQueues(rsZnode);
569
570
571 if (newQueues.isEmpty()) {
572
573
574 return;
575 }
576
577 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
578 String peerId = entry.getKey();
579 try {
580
581 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
582 String actualPeerId = replicationQueueInfo.getPeerId();
583 ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
584 ReplicationPeerConfig peerConfig = null;
585 try {
586 peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
587 } catch (ReplicationException ex) {
588 LOG.warn("Received exception while getting replication peer config, skipping replay"
589 + ex);
590 }
591 if (peer == null || peerConfig == null) {
592 LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
593 continue;
594 }
595
596 ReplicationSourceInterface src =
597 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
598 server, peerId, this.clusterId, peerConfig, peer);
599 if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
600 src.terminate("Recovered queue doesn't belong to any current peer");
601 break;
602 }
603 oldsources.add(src);
604 SortedSet<String> hlogsSet = entry.getValue();
605 for (String hlog : hlogsSet) {
606 src.enqueueLog(new Path(oldLogDir, hlog));
607 }
608 src.startup();
609 hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
610 } catch (IOException e) {
611
612 LOG.error("Failed creating a source", e);
613 }
614 }
615 }
616 }
617
618
619
620
621
622 public Path getOldLogDir() {
623 return this.oldLogDir;
624 }
625
626
627
628
629
630 public Path getLogDir() {
631 return this.logDir;
632 }
633
634
635
636
637
638 public FileSystem getFs() {
639 return this.fs;
640 }
641
642
643
644
645 public String getStats() {
646 StringBuffer stats = new StringBuffer();
647 for (ReplicationSourceInterface source : sources) {
648 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
649 stats.append(source.getStats() + "\n");
650 }
651 for (ReplicationSourceInterface oldSource : oldsources) {
652 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
653 stats.append(oldSource.getStats()+ "\n");
654 }
655 return stats.toString();
656 }
657 }