1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.master;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
30 import org.apache.hadoop.hbase.replication.ReplicationException;
31 import org.apache.hadoop.hbase.replication.ReplicationFactory;
32 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
33 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
34 import java.io.IOException;
35 import java.util.List;
36 import java.util.Set;
37
38 import com.google.common.base.Predicate;
39 import com.google.common.collect.ImmutableSet;
40 import com.google.common.collect.Iterables;
41 import com.google.common.collect.Sets;
42
43
44
45
46
47 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
48 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
49 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
50 private ZooKeeperWatcher zkw;
51 private ReplicationQueuesClient replicationQueues;
52 private boolean stopped = false;
53 private boolean aborted;
54
55
56 @Override
57 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
58
59
60 if (this.getConf() == null) {
61 return files;
62 }
63
64 final Set<String> hlogs = loadHLogsFromQueues();
65 return Iterables.filter(files, new Predicate<FileStatus>() {
66 @Override
67 public boolean apply(FileStatus file) {
68 String hlog = file.getPath().getName();
69 boolean logInReplicationQueue = hlogs.contains(hlog);
70 if (LOG.isDebugEnabled()) {
71 if (logInReplicationQueue) {
72 LOG.debug("Found log in ZK, keeping: " + hlog);
73 } else {
74 LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
75 }
76 }
77 return !logInReplicationQueue;
78 }});
79 }
80
81
82
83
84 private Set<String> loadHLogsFromQueues() {
85 List<String> rss = replicationQueues.getListOfReplicators();
86 if (rss == null) {
87 LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
88 return ImmutableSet.of();
89 }
90 Set<String> hlogs = Sets.newHashSet();
91 for (String rs: rss) {
92 List<String> listOfPeers = replicationQueues.getAllQueues(rs);
93
94 if (listOfPeers == null) {
95 continue;
96 }
97 for (String id : listOfPeers) {
98 List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
99 if (peersHlogs != null) {
100 hlogs.addAll(peersHlogs);
101 }
102 }
103 }
104 return hlogs;
105 }
106
107 @Override
108 public void setConf(Configuration config) {
109
110 if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
111 HConstants.REPLICATION_ENABLE_DEFAULT)) {
112 LOG.warn("Not configured - allowing all hlogs to be deleted");
113 return;
114 }
115
116
117 Configuration conf = new Configuration(config);
118 super.setConf(conf);
119 try {
120 this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
121 this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
122 this.replicationQueues.init();
123 } catch (ReplicationException e) {
124 LOG.error("Error while configuring " + this.getClass().getName(), e);
125 } catch (IOException e) {
126 LOG.error("Error while configuring " + this.getClass().getName(), e);
127 }
128 }
129
130 @Override
131 public void stop(String why) {
132 if (this.stopped) return;
133 this.stopped = true;
134 if (this.zkw != null) {
135 LOG.info("Stopping " + this.zkw);
136 this.zkw.close();
137 }
138 }
139
140 @Override
141 public boolean isStopped() {
142 return this.stopped;
143 }
144
145 @Override
146 public void abort(String why, Throwable e) {
147 LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
148 this.aborted = true;
149 stop(why);
150 }
151
152 @Override
153 public boolean isAborted() {
154 return this.aborted;
155 }
156 }