View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.CellScanner;
29  import org.apache.hadoop.hbase.Coprocessor;
30  import org.apache.hadoop.hbase.CoprocessorEnvironment;
31  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32  import org.apache.hadoop.hbase.MetaMutationAnnotation;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.hbase.client.Mutation;
36  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
37  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
38  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
39  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
40  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
41  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
42  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
43  
44  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
45  @InterfaceStability.Evolving
46  public class RegionServerCoprocessorHost extends
47      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
48  
49    private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
50  
51    private RegionServerServices rsServices;
52  
53    public RegionServerCoprocessorHost(RegionServerServices rsServices,
54        Configuration conf) {
55      super(rsServices);
56      this.rsServices = rsServices;
57      this.conf = conf;
58      // Log the state of coprocessor loading here; should appear only once or
59      // twice in the daemon log, depending on HBase version, because there is
60      // only one RegionServerCoprocessorHost instance in the RS process
61      boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
62        DEFAULT_COPROCESSORS_ENABLED);
63      boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
64        DEFAULT_USER_COPROCESSORS_ENABLED);
65      LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
66      LOG.info("Table coprocessor loading is " +
67        ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
68      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
69    }
70  
71    @Override
72    public RegionServerEnvironment createEnvironment(Class<?> implClass,
73        Coprocessor instance, int priority, int sequence, Configuration conf) {
74      return new RegionServerEnvironment(implClass, instance, priority,
75        sequence, conf, this.rsServices);
76    }
77  
78    public void preStop(String message) throws IOException {
79      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
80        @Override
81        public void call(RegionServerObserver oserver,
82            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
83          oserver.preStopRegionServer(ctx);
84        }
85        @Override
86        public void postEnvCall(RegionServerEnvironment env) {
87          // invoke coprocessor stop method
88          shutdown(env);
89        }
90      });
91    }
92  
93    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
94      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
95        @Override
96        public void call(RegionServerObserver oserver,
97            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
98          oserver.preMerge(ctx, regionA, regionB);
99        }
100     });
101   }
102 
103   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
104       throws IOException {
105     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
106       @Override
107       public void call(RegionServerObserver oserver,
108           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
109         oserver.postMerge(ctx, regionA, regionB, mergedRegion);
110       }
111     });
112   }
113 
114   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
115       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
116     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
117       @Override
118       public void call(RegionServerObserver oserver,
119           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
120         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
121       }
122     });
123   }
124 
125   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
126       final HRegion mergedRegion) throws IOException {
127     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
128       @Override
129       public void call(RegionServerObserver oserver,
130           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
131         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
132       }
133     });
134   }
135 
136   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
137     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
138       @Override
139       public void call(RegionServerObserver oserver,
140           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
141         oserver.preRollBackMerge(ctx, regionA, regionB);
142       }
143     });
144   }
145 
146   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
147     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
148       @Override
149       public void call(RegionServerObserver oserver,
150           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
151         oserver.postRollBackMerge(ctx, regionA, regionB);
152       }
153     });
154   }
155 
156   public void preRollWALWriterRequest() throws IOException {
157     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
158       @Override
159       public void call(RegionServerObserver oserver,
160           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
161         oserver.preRollWALWriterRequest(ctx);
162       }
163     });
164   }
165 
166   public void postRollWALWriterRequest() throws IOException {
167     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
168       @Override
169       public void call(RegionServerObserver oserver,
170           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
171         oserver.postRollWALWriterRequest(ctx);
172       }
173     });
174   }
175 
176   public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
177       throws IOException {
178     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
179       @Override
180       public void call(RegionServerObserver oserver,
181           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
182         oserver.preReplicateLogEntries(ctx, entries, cells);
183       }
184     });
185   }
186 
187   public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
188       throws IOException {
189     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
190       @Override
191       public void call(RegionServerObserver oserver,
192           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
193         oserver.postReplicateLogEntries(ctx, entries, cells);
194       }
195     });
196   }
197 
198   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
199       throws IOException {
200     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
201         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
202           @Override
203           public void call(RegionServerObserver oserver,
204               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
205             try {
206               oserver.getClass().getDeclaredMethod("postCreateReplicationEndPoint",
207                   ObserverContext.class, ReplicationEndpoint.class);
208             } catch (NoSuchMethodException e) {
209               LOG.warn("The RegionServer Observer class "
210                   + oserver.getClass().getName()
211                   + " does not have the"
212                   + "method postCreateReplicationEndPoint(). Consider upgrading inorder to replicate visibility"
213                   + " labels as strings");
214               setResult(endpoint);
215               return;
216             } catch (SecurityException e) {
217               LOG.warn("The RegionServer Observer class "
218                   + oserver.getClass().getName()
219                   + " does not have the"
220                   + "method postCreateReplicationEndPoint(). Consider upgrading inorder to replicate visibility"
221                   + " labels as strings");
222               setResult(endpoint);
223               return;
224             }
225             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
226           }
227         });
228   }
229 
230   private <T> T execOperationWithResult(final T defaultValue,
231       final CoprocessOperationWithResult<T> ctx) throws IOException {
232     if (ctx == null)
233       return defaultValue;
234     ctx.setResult(defaultValue);
235     execOperation(ctx);
236     return ctx.getResult();
237   }
238 
239   private static abstract class CoprocessorOperation extends
240       ObserverContext<RegionServerCoprocessorEnvironment> {
241     public CoprocessorOperation() {
242     }
243 
244     public abstract void call(RegionServerObserver oserver,
245         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
246 
247     public void postEnvCall(RegionServerEnvironment env) {
248     }
249   }
250 
251   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
252     private T result = null;
253 
254     public void setResult(final T result) {
255       this.result = result;
256     }
257 
258     public T getResult() {
259       return this.result;
260     }
261   }
262 
263   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
264     if (ctx == null) return false;
265 
266     boolean bypass = false;
267     for (RegionServerEnvironment env: coprocessors) {
268       if (env.getInstance() instanceof RegionServerObserver) {
269         ctx.prepare(env);
270         Thread currentThread = Thread.currentThread();
271         ClassLoader cl = currentThread.getContextClassLoader();
272         try {
273           currentThread.setContextClassLoader(env.getClassLoader());
274           ctx.call((RegionServerObserver)env.getInstance(), ctx);
275         } catch (Throwable e) {
276           handleCoprocessorThrowable(env, e);
277         } finally {
278           currentThread.setContextClassLoader(cl);
279         }
280         bypass |= ctx.shouldBypass();
281         if (ctx.shouldComplete()) {
282           break;
283         }
284       }
285       ctx.postEnvCall(env);
286     }
287     return bypass;
288   }
289 
290   /**
291    * Coprocessor environment extension providing access to region server
292    * related services.
293    */
294   static class RegionServerEnvironment extends CoprocessorHost.Environment
295       implements RegionServerCoprocessorEnvironment {
296 
297     private RegionServerServices regionServerServices;
298 
299     public RegionServerEnvironment(final Class<?> implClass,
300         final Coprocessor impl, final int priority, final int seq,
301         final Configuration conf, final RegionServerServices services) {
302       super(impl, priority, seq, conf);
303       this.regionServerServices = services;
304       for (Class c : implClass.getInterfaces()) {
305         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
306           this.regionServerServices.registerService(((SingletonCoprocessorService) impl).getService());
307           break;
308         }
309       }
310     }
311 
312     @Override
313     public RegionServerServices getRegionServerServices() {
314       return regionServerServices;
315     }
316   }
317 
318   /**
319    * Environment priority comparator. Coprocessors are chained in sorted
320    * order.
321    */
322   static class EnvironmentPriorityComparator implements
323       Comparator<CoprocessorEnvironment> {
324     public int compare(final CoprocessorEnvironment env1,
325         final CoprocessorEnvironment env2) {
326       if (env1.getPriority() < env2.getPriority()) {
327         return -1;
328       } else if (env1.getPriority() > env2.getPriority()) {
329         return 1;
330       }
331       if (env1.getLoadSequence() < env2.getLoadSequence()) {
332         return -1;
333       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
334         return 1;
335       }
336       return 0;
337     }
338   }
339 }