1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.master;
22
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Chore;
27 import org.apache.hadoop.hbase.ClusterStatus;
28 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
32 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33 import org.apache.hadoop.hbase.util.Pair;
34 import org.apache.hadoop.hbase.util.Threads;
35 import org.apache.hadoop.hbase.util.VersionInfo;
36 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
37 import org.jboss.netty.channel.ChannelEvent;
38 import org.jboss.netty.channel.ChannelHandlerContext;
39 import org.jboss.netty.channel.ChannelUpstreamHandler;
40 import org.jboss.netty.channel.Channels;
41 import org.jboss.netty.channel.socket.DatagramChannel;
42 import org.jboss.netty.channel.socket.DatagramChannelFactory;
43 import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
44 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
45
46 import java.io.Closeable;
47 import java.io.IOException;
48 import java.net.InetAddress;
49 import java.net.InetSocketAddress;
50 import java.net.UnknownHostException;
51 import java.util.ArrayList;
52 import java.util.Collections;
53 import java.util.Comparator;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentMap;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60
61
62
63
64
65
66
67 @InterfaceAudience.Private
68 public class ClusterStatusPublisher extends Chore {
69
70
71
72
73
74 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
75 public static final Class<? extends ClusterStatusPublisher.Publisher>
76 DEFAULT_STATUS_PUBLISHER_CLASS =
77 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
78
79
80
81
82 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
83 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
84
85 private long lastMessageTime = 0;
86 private final HMaster master;
87 private final int messagePeriod;
88 private final ConcurrentMap<ServerName, Integer> lastSent =
89 new ConcurrentHashMap<ServerName, Integer>();
90 private Publisher publisher;
91 private boolean connected = false;
92
93
94
95
96
97 public final static int MAX_SERVER_PER_MESSAGE = 10;
98
99
100
101
102
103 public final static int NB_SEND = 5;
104
105 public ClusterStatusPublisher(HMaster master, Configuration conf,
106 Class<? extends Publisher> publisherClass)
107 throws IOException {
108 super("HBase clusterStatusPublisher for " + master.getName(),
109 conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
110 this.master = master;
111 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
112 try {
113 this.publisher = publisherClass.newInstance();
114 } catch (InstantiationException e) {
115 throw new IOException("Can't create publisher " + publisherClass.getName(), e);
116 } catch (IllegalAccessException e) {
117 throw new IOException("Can't create publisher " + publisherClass.getName(), e);
118 }
119 this.publisher.connect(conf);
120 connected = true;
121 }
122
123
124 protected ClusterStatusPublisher() {
125 master = null;
126 messagePeriod = 0;
127 }
128
129 @Override
130 protected void chore() {
131 if (!connected) {
132 return;
133 }
134
135 List<ServerName> sns = generateDeadServersListToSend();
136 if (sns.isEmpty()) {
137
138 return;
139 }
140
141 final long curTime = EnvironmentEdgeManager.currentTimeMillis();
142 if (lastMessageTime > curTime - messagePeriod) {
143
144 return;
145 }
146
147
148 lastMessageTime = curTime;
149
150
151
152
153 ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
154 master.getMasterFileSystem().getClusterId().toString(),
155 null,
156 sns,
157 master.getServerName(),
158 null,
159 null,
160 null,
161 null);
162
163
164 publisher.publish(cs);
165 }
166
167 protected void cleanup() {
168 connected = false;
169 publisher.close();
170 }
171
172
173
174
175
176
177 protected List<ServerName> generateDeadServersListToSend() {
178
179 long since = EnvironmentEdgeManager.currentTimeMillis() - messagePeriod * 2;
180 for (Pair<ServerName, Long> dead : getDeadServers(since)) {
181 lastSent.putIfAbsent(dead.getFirst(), 0);
182 }
183
184
185 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
186 entries.addAll(lastSent.entrySet());
187 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
188 @Override
189 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
190 return o1.getValue().compareTo(o2.getValue());
191 }
192 });
193
194
195 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
196 List<ServerName> res = new ArrayList<ServerName>(max);
197
198 for (int i = 0; i < max; i++) {
199 Map.Entry<ServerName, Integer> toSend = entries.get(i);
200 if (toSend.getValue() >= (NB_SEND - 1)) {
201 lastSent.remove(toSend.getKey());
202 } else {
203 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
204 }
205
206 res.add(toSend.getKey());
207 }
208
209 return res;
210 }
211
212
213
214
215
216 protected List<Pair<ServerName, Long>> getDeadServers(long since) {
217 if (master.getServerManager() == null) {
218 return Collections.emptyList();
219 }
220
221 return master.getServerManager().getDeadServers().copyDeadServersSince(since);
222 }
223
224
225 public interface Publisher extends Closeable {
226
227 void connect(Configuration conf) throws IOException;
228
229 void publish(ClusterStatus cs);
230
231 @Override
232 void close();
233 }
234
235 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
236 public static class MulticastPublisher implements Publisher {
237 private DatagramChannel channel;
238 private final ExecutorService service = Executors.newSingleThreadExecutor(
239 Threads.newDaemonThreadFactory("hbase-master-clusterStatus-worker"));
240
241 public MulticastPublisher() {
242 }
243
244 @Override
245 public void connect(Configuration conf) throws IOException {
246 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
247 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
248 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
249 HConstants.DEFAULT_STATUS_MULTICAST_PORT);
250
251
252 DatagramChannelFactory f = new OioDatagramChannelFactory(service);
253
254 ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
255 b.setPipeline(Channels.pipeline(new ProtobufEncoder(),
256 new ChannelUpstreamHandler() {
257 @Override
258 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
259 throws Exception {
260
261 }
262 }));
263
264
265 channel = (DatagramChannel) b.bind(new InetSocketAddress(0));
266 channel.getConfig().setReuseAddress(true);
267
268 InetAddress ina;
269 try {
270 ina = InetAddress.getByName(mcAddress);
271 } catch (UnknownHostException e) {
272 throw new IOException("Can't connect to " + mcAddress, e);
273 }
274 channel.joinGroup(ina);
275 channel.connect(new InetSocketAddress(mcAddress, port));
276 }
277
278 @Override
279 public void publish(ClusterStatus cs) {
280 ClusterStatusProtos.ClusterStatus csp = cs.convert();
281 channel.write(csp);
282 }
283
284 @Override
285 public void close() {
286 if (channel != null) {
287 channel.close();
288 }
289 service.shutdown();
290 }
291 }
292 }