View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.NavigableMap;
25  import java.util.TreeMap;
26  import java.util.UUID;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.TimeUnit;
30  
31  import com.google.common.util.concurrent.ThreadFactoryBuilder;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.CellScanner;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.Server;
44  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
45  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
46  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
47  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
48  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
49  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationFactory;
52  import org.apache.hadoop.hbase.replication.ReplicationPeers;
53  import org.apache.hadoop.hbase.replication.ReplicationQueues;
54  import org.apache.hadoop.hbase.replication.ReplicationTracker;
55  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
58  import org.apache.zookeeper.KeeperException;
59  
60  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
61  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
62  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
63  
64  /**
65   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
66   */
67  @InterfaceAudience.Private
68  public class Replication implements WALActionsListener, 
69    ReplicationSourceService, ReplicationSinkService {
70    private static final Log LOG =
71        LogFactory.getLog(Replication.class);
72    private boolean replication;
73    private ReplicationSourceManager replicationManager;
74    private ReplicationQueues replicationQueues;
75    private ReplicationPeers replicationPeers;
76    private ReplicationTracker replicationTracker;
77    private Configuration conf;
78    private ReplicationSink replicationSink;
79    // Hosting server
80    private Server server;
81    /** Statistics thread schedule pool */
82    private ScheduledExecutorService scheduleThreadPool;
83    private int statsThreadPeriod;
84    // ReplicationLoad to access replication metrics
85    private ReplicationLoad replicationLoad;
86  
87    /**
88     * Instantiate the replication management (if rep is enabled).
89     * @param server Hosting server
90     * @param fs handle to the filesystem
91     * @param logDir
92     * @param oldLogDir directory where logs are archived
93     * @throws IOException
94     */
95    public Replication(final Server server, final FileSystem fs,
96        final Path logDir, final Path oldLogDir) throws IOException{
97      initialize(server, fs, logDir, oldLogDir);
98    }
99  
100   /**
101    * Empty constructor
102    */
103   public Replication() {
104   }
105 
106   public void initialize(final Server server, final FileSystem fs,
107       final Path logDir, final Path oldLogDir) throws IOException {
108     this.server = server;
109     this.conf = this.server.getConfiguration();
110     this.replication = isReplication(this.conf);
111     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
112       new ThreadFactoryBuilder()
113         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
114         .setDaemon(true)
115         .build());
116     if (replication) {
117       try {
118         this.replicationQueues =
119             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
120         this.replicationQueues.init(this.server.getServerName().toString());
121         this.replicationPeers =
122             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
123         this.replicationPeers.init();
124         this.replicationTracker =
125             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
126               this.conf, this.server, this.server);
127       } catch (ReplicationException e) {
128         throw new IOException("Failed replication handler create", e);
129       }
130       UUID clusterId = null;
131       try {
132         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
133       } catch (KeeperException ke) {
134         throw new IOException("Could not read cluster id", ke);
135       }
136       this.replicationManager =
137           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
138               conf, this.server, fs, logDir, oldLogDir, clusterId);
139       this.statsThreadPeriod =
140           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
141       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
142       this.replicationLoad = new ReplicationLoad();
143     } else {
144       this.replicationManager = null;
145       this.replicationQueues = null;
146       this.replicationPeers = null;
147       this.replicationTracker = null;
148       this.replicationLoad = null;
149     }
150   }
151 
152    /**
153     * @param c Configuration to look at
154     * @return True if replication is enabled.
155     */
156   public static boolean isReplication(final Configuration c) {
157     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
158   }
159 
160    /*
161     * Returns an object to listen to new hlog changes
162     **/
163   public WALActionsListener getWALActionsListener() {
164     return this;
165   }
166   /**
167    * Stops replication service.
168    */
169   public void stopReplicationService() {
170     join();
171   }
172 
173   /**
174    * Join with the replication threads
175    */
176   public void join() {
177     if (this.replication) {
178       this.replicationManager.join();
179       if (this.replicationSink != null) {
180         this.replicationSink.stopReplicationSinkServices();
181       }
182     }
183     scheduleThreadPool.shutdown();
184   }
185 
186   /**
187    * Carry on the list of log entries down to the sink
188    * @param entries list of entries to replicate
189    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
190    * do not contain the Cells we are replicating; they are passed here on the side in this
191    * CellScanner).
192    * @throws IOException
193    */
194   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
195     if (this.replication) {
196       this.replicationSink.replicateEntries(entries, cells);
197     }
198   }
199 
200   /**
201    * If replication is enabled and this cluster is a master,
202    * it starts
203    * @throws IOException
204    */
205   public void startReplicationService() throws IOException {
206     if (this.replication) {
207       try {
208         this.replicationManager.init();
209       } catch (ReplicationException e) {
210         throw new IOException(e);
211       }
212       this.replicationSink = new ReplicationSink(this.conf, this.server);
213       this.scheduleThreadPool.scheduleAtFixedRate(
214         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
215         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
216     }
217   }
218 
219   /**
220    * Get the replication sources manager
221    * @return the manager if replication is enabled, else returns false
222    */
223   public ReplicationSourceManager getReplicationManager() {
224     return this.replicationManager;
225   }
226 
227   @Override
228   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
229       WALEdit logEdit) {
230     // Not interested
231   }
232 
233   @Override
234   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
235                                        WALEdit logEdit) {
236     scopeWALEdits(htd, logKey, logEdit);
237   }
238 
239   /**
240    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
241    * from compaction WAL edits and if the scope is local.
242    * @param htd Descriptor used to find the scope to use
243    * @param logKey Key that may get scoped according to its edits
244    * @param logEdit Edits used to lookup the scopes
245    */
246   public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey,
247                                    WALEdit logEdit) {
248     NavigableMap<byte[], Integer> scopes =
249         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
250     byte[] family;
251     for (KeyValue kv : logEdit.getKeyValues()) {
252       family = kv.getFamily();
253       // This is expected and the KV should not be replicated
254       if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
255       // Unexpected, has a tendency to happen in unit tests
256       assert htd.getFamily(family) != null;
257 
258       int scope = htd.getFamily(family).getScope();
259       if (scope != REPLICATION_SCOPE_LOCAL &&
260           !scopes.containsKey(family)) {
261         scopes.put(family, scope);
262       }
263     }
264     if (!scopes.isEmpty()) {
265       logKey.setScopes(scopes);
266     }
267   }
268 
269   @Override
270   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
271     getReplicationManager().preLogRoll(newPath);
272   }
273 
274   @Override
275   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
276     getReplicationManager().postLogRoll(newPath);
277   }
278 
279   @Override
280   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
281     // Not interested
282   }
283 
284   @Override
285   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
286     // Not interested
287   }
288 
289   /**
290    * This method modifies the master's configuration in order to inject
291    * replication-related features
292    * @param conf
293    */
294   public static void decorateMasterConfiguration(Configuration conf) {
295     if (!isReplication(conf)) {
296       return;
297     }
298     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
299     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
300     if (!plugins.contains(cleanerClass)) {
301       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
302     }
303   }
304 
305   @Override
306   public void logRollRequested(boolean tooFewReplicas) {
307     // Not interested
308   }
309 
310   @Override
311   public void logCloseRequested() {
312     // not interested
313   }
314 
315   /*
316    * Statistics thread. Periodically prints the cache statistics to the log.
317    */
318   static class ReplicationStatisticsThread extends Thread {
319 
320     private final ReplicationSink replicationSink;
321     private final ReplicationSourceManager replicationManager;
322 
323     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
324                             final ReplicationSourceManager replicationManager) {
325       super("ReplicationStatisticsThread");
326       this.replicationManager = replicationManager;
327       this.replicationSink = replicationSink;
328     }
329 
330     @Override
331     public void run() {
332       printStats(this.replicationManager.getStats());
333       printStats(this.replicationSink.getStats());
334     }
335 
336     private void printStats(String stats) {
337       if (!stats.isEmpty()) {
338         LOG.info(stats);
339       }
340     }
341   }
342 
343   @Override
344   public ReplicationLoad refreshAndGetReplicationLoad() {
345     if (this.replicationLoad == null) {
346       return null;
347     }
348     // always build for latest data
349     buildReplicationLoad();
350     return this.replicationLoad;
351   }
352 
353   private void buildReplicationLoad() {
354     // get source
355     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
356     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
357 
358     for (ReplicationSourceInterface source : sources) {
359       if (source instanceof ReplicationSource) {
360         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
361       }
362     }
363     // get sink
364     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
365     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
366   }
367 }