View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.master;
22  
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Chore;
27  import org.apache.hadoop.hbase.ClusterStatus;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.ServerName;
31  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
32  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33  import org.apache.hadoop.hbase.util.Pair;
34  import org.apache.hadoop.hbase.util.Threads;
35  import org.apache.hadoop.hbase.util.VersionInfo;
36  import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
37  import org.jboss.netty.channel.ChannelEvent;
38  import org.jboss.netty.channel.ChannelHandlerContext;
39  import org.jboss.netty.channel.ChannelUpstreamHandler;
40  import org.jboss.netty.channel.Channels;
41  import org.jboss.netty.channel.socket.DatagramChannel;
42  import org.jboss.netty.channel.socket.DatagramChannelFactory;
43  import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
44  import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
45  
46  import java.io.Closeable;
47  import java.io.IOException;
48  import java.net.InetAddress;
49  import java.net.InetSocketAddress;
50  import java.net.UnknownHostException;
51  import java.util.ArrayList;
52  import java.util.Collections;
53  import java.util.Comparator;
54  import java.util.List;
55  import java.util.Map;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentMap;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  
61  /**
62   * Class to publish the cluster status to the client. This allows them to know immediately
63   *  the dead region servers, hence to cut the connection they have with them, eventually stop
64   *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
65   *  on the client the different timeouts, as the dead servers will be detected separately.
66   */
67  @InterfaceAudience.Private
68  public class ClusterStatusPublisher extends Chore {
69    /**
70     * The implementation class used to publish the status. Default is null (no publish).
71     * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
72     * status.
73     */
74    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
75    public static final Class<? extends ClusterStatusPublisher.Publisher>
76        DEFAULT_STATUS_PUBLISHER_CLASS =
77        org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
78  
79    /**
80     * The minimum time between two status messages, in milliseconds.
81     */
82    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
83    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
84  
85    private long lastMessageTime = 0;
86    private final HMaster master;
87    private final int messagePeriod; // time between two message
88    private final ConcurrentMap<ServerName, Integer> lastSent =
89        new ConcurrentHashMap<ServerName, Integer>();
90    private Publisher publisher;
91    private boolean connected = false;
92  
93    /**
94     * We want to limit the size of the protobuf message sent, do fit into a single packet.
95     * a reasonable size for ip / ethernet is less than 1Kb.
96     */
97    public final static int MAX_SERVER_PER_MESSAGE = 10;
98  
99    /**
100    * If a server dies, we're sending the information multiple times in case a receiver misses the
101    * message.
102    */
103   public final static int NB_SEND = 5;
104 
105   public ClusterStatusPublisher(HMaster master, Configuration conf,
106                                 Class<? extends Publisher> publisherClass)
107       throws IOException {
108     super("HBase clusterStatusPublisher for " + master.getName(),
109         conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
110     this.master = master;
111     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
112     try {
113       this.publisher = publisherClass.newInstance();
114     } catch (InstantiationException e) {
115       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
116     } catch (IllegalAccessException e) {
117       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
118     }
119     this.publisher.connect(conf);
120     connected = true;
121   }
122 
123   // For tests only
124   protected ClusterStatusPublisher() {
125     master = null;
126     messagePeriod = 0;
127   }
128 
129   @Override
130   protected void chore() {
131     if (!connected) {
132       return;
133     }
134 
135     List<ServerName> sns = generateDeadServersListToSend();
136     if (sns.isEmpty()) {
137       // Nothing to send. Done.
138       return;
139     }
140 
141     final long curTime = EnvironmentEdgeManager.currentTimeMillis();
142     if (lastMessageTime > curTime - messagePeriod) {
143       // We already sent something less than 10 second ago. Done.
144       return;
145     }
146 
147     // Ok, we're going to send something then.
148     lastMessageTime = curTime;
149 
150     // We're reusing an existing protobuf message, but we don't send everything.
151     // This could be extended in the future, for example if we want to send stuff like the
152     //  hbase:meta server name.
153     ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
154         master.getMasterFileSystem().getClusterId().toString(),
155         null,
156         sns,
157         master.getServerName(),
158         null,
159         null,
160         null,
161         null);
162 
163 
164     publisher.publish(cs);
165   }
166 
167   protected void cleanup() {
168     connected = false;
169     publisher.close();
170   }
171 
172   /**
173    * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
174    * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
175    * dead first.
176    */
177   protected List<ServerName> generateDeadServersListToSend() {
178     // We're getting the message sent since last time, and add them to the list
179     long since = EnvironmentEdgeManager.currentTimeMillis() - messagePeriod * 2;
180     for (Pair<ServerName, Long> dead : getDeadServers(since)) {
181       lastSent.putIfAbsent(dead.getFirst(), 0);
182     }
183 
184     // We're sending the new deads first.
185     List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
186     entries.addAll(lastSent.entrySet());
187     Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
188       @Override
189       public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
190         return o1.getValue().compareTo(o2.getValue());
191       }
192     });
193 
194     // With a limit of MAX_SERVER_PER_MESSAGE
195     int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
196     List<ServerName> res = new ArrayList<ServerName>(max);
197 
198     for (int i = 0; i < max; i++) {
199       Map.Entry<ServerName, Integer> toSend = entries.get(i);
200       if (toSend.getValue() >= (NB_SEND - 1)) {
201         lastSent.remove(toSend.getKey());
202       } else {
203         lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
204       }
205 
206       res.add(toSend.getKey());
207     }
208 
209     return res;
210   }
211 
212   /**
213    * Get the servers which died since a given timestamp.
214    * protected because it can be subclassed by the tests.
215    */
216   protected List<Pair<ServerName, Long>> getDeadServers(long since) {
217     if (master.getServerManager() == null) {
218       return Collections.emptyList();
219     }
220 
221     return master.getServerManager().getDeadServers().copyDeadServersSince(since);
222   }
223 
224 
225   public interface Publisher extends Closeable {
226 
227     void connect(Configuration conf) throws IOException;
228 
229     void publish(ClusterStatus cs);
230 
231     @Override
232     void close();
233   }
234 
235   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
236   public static class MulticastPublisher implements Publisher {
237     private DatagramChannel channel;
238     private final ExecutorService service = Executors.newSingleThreadExecutor(
239         Threads.newDaemonThreadFactory("hbase-master-clusterStatus-worker"));
240 
241     public MulticastPublisher() {
242     }
243 
244     @Override
245     public void connect(Configuration conf) throws IOException {
246       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
247           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
248       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
249           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
250 
251       // Can't be NiO with Netty today => not implemented in Netty.
252       DatagramChannelFactory f = new OioDatagramChannelFactory(service);
253 
254       ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
255       b.setPipeline(Channels.pipeline(new ProtobufEncoder(),
256           new ChannelUpstreamHandler() {
257             @Override
258             public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
259                 throws Exception {
260               // We're just writing here. Discard any incoming data. See HBASE-8466.
261             }
262           }));
263 
264 
265       channel = (DatagramChannel) b.bind(new InetSocketAddress(0));
266       channel.getConfig().setReuseAddress(true);
267 
268       InetAddress ina;
269       try {
270         ina = InetAddress.getByName(mcAddress);
271       } catch (UnknownHostException e) {
272         throw new IOException("Can't connect to " + mcAddress, e);
273       }
274       channel.joinGroup(ina);
275       channel.connect(new InetSocketAddress(mcAddress, port));
276     }
277 
278     @Override
279     public void publish(ClusterStatus cs) {
280       ClusterStatusProtos.ClusterStatus csp = cs.convert();
281       channel.write(csp);
282     }
283 
284     @Override
285     public void close() {
286       if (channel != null) {
287         channel.close();
288       }
289       service.shutdown();
290     }
291   }
292 }