1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Comparator;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.CellScanner;
29 import org.apache.hadoop.hbase.Coprocessor;
30 import org.apache.hadoop.hbase.CoprocessorEnvironment;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.MetaMutationAnnotation;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.classification.InterfaceStability;
35 import org.apache.hadoop.hbase.client.Mutation;
36 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
37 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
38 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
39 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
40 import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
41 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
42 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
43
44 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
45 @InterfaceStability.Evolving
46 public class RegionServerCoprocessorHost extends
47 CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
48
49 private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
50
51 private RegionServerServices rsServices;
52
53 public RegionServerCoprocessorHost(RegionServerServices rsServices,
54 Configuration conf) {
55 super(rsServices);
56 this.rsServices = rsServices;
57 this.conf = conf;
58
59
60
61 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
62 DEFAULT_COPROCESSORS_ENABLED);
63 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
64 DEFAULT_USER_COPROCESSORS_ENABLED);
65 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
66 LOG.info("Table coprocessor loading is " +
67 ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
68 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
69 }
70
71 @Override
72 public RegionServerEnvironment createEnvironment(Class<?> implClass,
73 Coprocessor instance, int priority, int sequence, Configuration conf) {
74 return new RegionServerEnvironment(implClass, instance, priority,
75 sequence, conf, this.rsServices);
76 }
77
78 public void preStop(String message) throws IOException {
79 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
80 @Override
81 public void call(RegionServerObserver oserver,
82 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
83 oserver.preStopRegionServer(ctx);
84 }
85 @Override
86 public void postEnvCall(RegionServerEnvironment env) {
87
88 shutdown(env);
89 }
90 });
91 }
92
93 public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
94 return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
95 @Override
96 public void call(RegionServerObserver oserver,
97 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
98 oserver.preMerge(ctx, regionA, regionB);
99 }
100 });
101 }
102
103 public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
104 throws IOException {
105 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
106 @Override
107 public void call(RegionServerObserver oserver,
108 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
109 oserver.postMerge(ctx, regionA, regionB, mergedRegion);
110 }
111 });
112 }
113
114 public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
115 final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
116 return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
117 @Override
118 public void call(RegionServerObserver oserver,
119 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
120 oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
121 }
122 });
123 }
124
125 public void postMergeCommit(final HRegion regionA, final HRegion regionB,
126 final HRegion mergedRegion) throws IOException {
127 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
128 @Override
129 public void call(RegionServerObserver oserver,
130 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
131 oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
132 }
133 });
134 }
135
136 public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
137 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
138 @Override
139 public void call(RegionServerObserver oserver,
140 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
141 oserver.preRollBackMerge(ctx, regionA, regionB);
142 }
143 });
144 }
145
146 public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
147 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
148 @Override
149 public void call(RegionServerObserver oserver,
150 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
151 oserver.postRollBackMerge(ctx, regionA, regionB);
152 }
153 });
154 }
155
156 public void preRollWALWriterRequest() throws IOException {
157 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
158 @Override
159 public void call(RegionServerObserver oserver,
160 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
161 oserver.preRollWALWriterRequest(ctx);
162 }
163 });
164 }
165
166 public void postRollWALWriterRequest() throws IOException {
167 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
168 @Override
169 public void call(RegionServerObserver oserver,
170 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
171 oserver.postRollWALWriterRequest(ctx);
172 }
173 });
174 }
175
176 public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
177 throws IOException {
178 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
179 @Override
180 public void call(RegionServerObserver oserver,
181 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
182 oserver.preReplicateLogEntries(ctx, entries, cells);
183 }
184 });
185 }
186
187 public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
188 throws IOException {
189 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
190 @Override
191 public void call(RegionServerObserver oserver,
192 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
193 oserver.postReplicateLogEntries(ctx, entries, cells);
194 }
195 });
196 }
197
198 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
199 throws IOException {
200 return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
201 : new CoprocessOperationWithResult<ReplicationEndpoint>() {
202 @Override
203 public void call(RegionServerObserver oserver,
204 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
205 try {
206 oserver.getClass().getDeclaredMethod("postCreateReplicationEndPoint",
207 ObserverContext.class, ReplicationEndpoint.class);
208 } catch (NoSuchMethodException e) {
209 LOG.warn("The RegionServer Observer class "
210 + oserver.getClass().getName()
211 + " does not have the"
212 + "method postCreateReplicationEndPoint(). Consider upgrading inorder to replicate visibility"
213 + " labels as strings");
214 setResult(endpoint);
215 return;
216 } catch (SecurityException e) {
217 LOG.warn("The RegionServer Observer class "
218 + oserver.getClass().getName()
219 + " does not have the"
220 + "method postCreateReplicationEndPoint(). Consider upgrading inorder to replicate visibility"
221 + " labels as strings");
222 setResult(endpoint);
223 return;
224 }
225 setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
226 }
227 });
228 }
229
230 private <T> T execOperationWithResult(final T defaultValue,
231 final CoprocessOperationWithResult<T> ctx) throws IOException {
232 if (ctx == null)
233 return defaultValue;
234 ctx.setResult(defaultValue);
235 execOperation(ctx);
236 return ctx.getResult();
237 }
238
239 private static abstract class CoprocessorOperation extends
240 ObserverContext<RegionServerCoprocessorEnvironment> {
241 public CoprocessorOperation() {
242 }
243
244 public abstract void call(RegionServerObserver oserver,
245 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
246
247 public void postEnvCall(RegionServerEnvironment env) {
248 }
249 }
250
251 private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
252 private T result = null;
253
254 public void setResult(final T result) {
255 this.result = result;
256 }
257
258 public T getResult() {
259 return this.result;
260 }
261 }
262
263 private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
264 if (ctx == null) return false;
265
266 boolean bypass = false;
267 for (RegionServerEnvironment env: coprocessors) {
268 if (env.getInstance() instanceof RegionServerObserver) {
269 ctx.prepare(env);
270 Thread currentThread = Thread.currentThread();
271 ClassLoader cl = currentThread.getContextClassLoader();
272 try {
273 currentThread.setContextClassLoader(env.getClassLoader());
274 ctx.call((RegionServerObserver)env.getInstance(), ctx);
275 } catch (Throwable e) {
276 handleCoprocessorThrowable(env, e);
277 } finally {
278 currentThread.setContextClassLoader(cl);
279 }
280 bypass |= ctx.shouldBypass();
281 if (ctx.shouldComplete()) {
282 break;
283 }
284 }
285 ctx.postEnvCall(env);
286 }
287 return bypass;
288 }
289
290
291
292
293
294 static class RegionServerEnvironment extends CoprocessorHost.Environment
295 implements RegionServerCoprocessorEnvironment {
296
297 private RegionServerServices regionServerServices;
298
299 public RegionServerEnvironment(final Class<?> implClass,
300 final Coprocessor impl, final int priority, final int seq,
301 final Configuration conf, final RegionServerServices services) {
302 super(impl, priority, seq, conf);
303 this.regionServerServices = services;
304 for (Class c : implClass.getInterfaces()) {
305 if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
306 this.regionServerServices.registerService(((SingletonCoprocessorService) impl).getService());
307 break;
308 }
309 }
310 }
311
312 @Override
313 public RegionServerServices getRegionServerServices() {
314 return regionServerServices;
315 }
316 }
317
318
319
320
321
322 static class EnvironmentPriorityComparator implements
323 Comparator<CoprocessorEnvironment> {
324 public int compare(final CoprocessorEnvironment env1,
325 final CoprocessorEnvironment env2) {
326 if (env1.getPriority() < env2.getPriority()) {
327 return -1;
328 } else if (env1.getPriority() > env2.getPriority()) {
329 return 1;
330 }
331 if (env1.getLoadSequence() < env2.getLoadSequence()) {
332 return -1;
333 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
334 return 1;
335 }
336 return 0;
337 }
338 }
339 }