1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.lang.reflect.Constructor;
26 import java.lang.reflect.InvocationTargetException;
27 import java.net.InetAddress;
28 import java.net.InetSocketAddress;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40 import org.apache.hadoop.hbase.ClusterStatus;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.ServerName;
43 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
44 import org.apache.hadoop.hbase.util.Threads;
45 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
46 import org.jboss.netty.channel.ChannelHandlerContext;
47 import org.jboss.netty.channel.Channels;
48 import org.jboss.netty.channel.ExceptionEvent;
49 import org.jboss.netty.channel.MessageEvent;
50 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
51 import org.jboss.netty.channel.socket.DatagramChannel;
52 import org.jboss.netty.channel.socket.DatagramChannelFactory;
53 import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
54 import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
55
56
57
58
59
60
61
62 @InterfaceAudience.Private
63 class ClusterStatusListener implements Closeable {
64 private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
65 private final List<ServerName> deadServers = new ArrayList<ServerName>();
66 protected final DeadServerHandler deadServerHandler;
67 private final Listener listener;
68
69
70
71
72 public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
73 public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
74 MulticastListener.class;
75
76
77
78
79 public interface DeadServerHandler {
80
81
82
83
84
85
86
87 void newDead(ServerName sn);
88 }
89
90
91
92
93
94 interface Listener extends Closeable {
95
96
97
98 @Override
99 void close();
100
101
102
103
104
105
106
107 void connect(Configuration conf) throws IOException;
108 }
109
110 public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
111 Class<? extends Listener> listenerClass) throws IOException {
112 this.deadServerHandler = dsh;
113 try {
114 Constructor<? extends Listener> ctor =
115 listenerClass.getConstructor(ClusterStatusListener.class);
116 this.listener = ctor.newInstance(this);
117 } catch (InstantiationException e) {
118 throw new IOException("Can't create listener " + listenerClass.getName(), e);
119 } catch (IllegalAccessException e) {
120 throw new IOException("Can't create listener " + listenerClass.getName(), e);
121 } catch (NoSuchMethodException e) {
122 throw new IllegalStateException();
123 } catch (InvocationTargetException e) {
124 throw new IllegalStateException();
125 }
126
127 this.listener.connect(conf);
128 }
129
130
131
132
133
134
135 public void receive(ClusterStatus ncs) {
136 if (ncs.getDeadServerNames() != null) {
137 for (ServerName sn : ncs.getDeadServerNames()) {
138 if (!isDeadServer(sn)) {
139 LOG.info("There is a new dead server: " + sn);
140 deadServers.add(sn);
141 if (deadServerHandler != null) {
142 deadServerHandler.newDead(sn);
143 }
144 }
145 }
146 }
147 }
148
149 @Override
150 public void close() {
151 listener.close();
152 }
153
154
155
156
157
158
159
160 public boolean isDeadServer(ServerName sn) {
161 if (sn.getStartcode() <= 0) {
162 return false;
163 }
164
165 for (ServerName dead : deadServers) {
166 if (dead.getStartcode() >= sn.getStartcode() &&
167 dead.getPort() == sn.getPort() &&
168 dead.getHostname().equals(sn.getHostname())) {
169 return true;
170 }
171 }
172
173 return false;
174 }
175
176
177
178
179
180 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
181 class MulticastListener implements Listener {
182 private DatagramChannel channel;
183 private final ExecutorService service = Executors.newSingleThreadExecutor(
184 Threads.newDaemonThreadFactory("hbase-client-clusterStatus-multiCastListener"));
185
186
187 public MulticastListener() {
188 }
189
190 @Override
191 public void connect(Configuration conf) throws IOException {
192
193 DatagramChannelFactory f = new OioDatagramChannelFactory(service);
194
195 ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
196 b.setPipeline(Channels.pipeline(
197 new ProtobufDecoder(ClusterStatusProtos.ClusterStatus.getDefaultInstance()),
198 new ClusterStatusHandler()));
199
200 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
201 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
202 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
203 HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
204 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
205 HConstants.DEFAULT_STATUS_MULTICAST_PORT);
206
207 channel = (DatagramChannel) b.bind(new InetSocketAddress(bindAddress, port));
208
209 channel.getConfig().setReuseAddress(true);
210
211 InetAddress ina;
212 try {
213 ina = InetAddress.getByName(mcAddress);
214 } catch (UnknownHostException e) {
215 throw new IOException("Can't connect to " + mcAddress, e);
216 }
217 channel.joinGroup(ina);
218 }
219
220 @Override
221 public void close() {
222 if (channel != null) {
223 channel.close();
224 channel = null;
225 }
226 service.shutdown();
227 }
228
229
230
231
232
233 private class ClusterStatusHandler extends SimpleChannelUpstreamHandler {
234
235 @Override
236 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
237 ClusterStatusProtos.ClusterStatus csp = (ClusterStatusProtos.ClusterStatus) e.getMessage();
238 ClusterStatus ncs = ClusterStatus.convert(csp);
239 receive(ncs);
240 }
241
242
243
244
245
246 @Override
247 public void exceptionCaught(
248 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
249 LOG.error("Unexpected exception, continuing.", e.getCause());
250 }
251 }
252 }
253 }