View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Abortable;
41  import org.apache.hadoop.hbase.Coprocessor;
42  import org.apache.hadoop.hbase.CoprocessorEnvironment;
43  import org.apache.hadoop.hbase.DoNotRetryIOException;
44  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.classification.InterfaceStability;
49  import org.apache.hadoop.hbase.client.Append;
50  import org.apache.hadoop.hbase.client.CoprocessorHConnection;
51  import org.apache.hadoop.hbase.client.Delete;
52  import org.apache.hadoop.hbase.client.Durability;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.HConnection;
55  import org.apache.hadoop.hbase.client.HTable;
56  import org.apache.hadoop.hbase.client.HTableInterface;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.client.Result;
60  import org.apache.hadoop.hbase.client.ResultScanner;
61  import org.apache.hadoop.hbase.client.Row;
62  import org.apache.hadoop.hbase.client.RowMutations;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.client.coprocessor.Batch;
65  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
66  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
67  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
70  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
71  import org.apache.hadoop.hbase.util.VersionInfo;
72  import org.apache.hadoop.io.MultipleIOException;
73  
74  import com.google.protobuf.Descriptors;
75  import com.google.protobuf.Message;
76  import com.google.protobuf.Service;
77  import com.google.protobuf.ServiceException;
78  
79  /**
80   * Provides the common setup framework and runtime services for coprocessor
81   * invocation from HBase services.
82   * @param <E> the specific environment extension that a concrete implementation
83   * provides
84   */
85  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
86  @InterfaceStability.Evolving
87  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
88    public static final String REGION_COPROCESSOR_CONF_KEY =
89        "hbase.coprocessor.region.classes";
90    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
91        "hbase.coprocessor.regionserver.classes";
92    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
93        "hbase.coprocessor.user.region.classes";
94    public static final String MASTER_COPROCESSOR_CONF_KEY =
95        "hbase.coprocessor.master.classes";
96    public static final String WAL_COPROCESSOR_CONF_KEY =
97      "hbase.coprocessor.wal.classes";
98    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
99    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
100   public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
101   public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
102   public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
103     "hbase.coprocessor.user.enabled";
104   public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
105 
106   protected static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
107   protected Abortable abortable;
108   /** Ordered set of loaded coprocessors with lock */
109   protected SortedSet<E> coprocessors =
110       new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
111   protected Configuration conf;
112   // unique file prefix to use for local copies of jars when classloading
113   protected String pathPrefix;
114   protected AtomicInteger loadSequence = new AtomicInteger();
115 
116   public CoprocessorHost(Abortable abortable) {
117     this.abortable = abortable;
118     this.pathPrefix = UUID.randomUUID().toString();
119   }
120 
121   /**
122    * Not to be confused with the per-object _coprocessors_ (above),
123    * coprocessorNames is static and stores the set of all coprocessors ever
124    * loaded by any thread in this JVM. It is strictly additive: coprocessors are
125    * added to coprocessorNames, by loadInstance() but are never removed, since
126    * the intention is to preserve a history of all loaded coprocessors for
127    * diagnosis in case of server crash (HBASE-4014).
128    */
129   private static Set<String> coprocessorNames =
130       Collections.synchronizedSet(new HashSet<String>());
131   public static Set<String> getLoadedCoprocessors() {
132       return coprocessorNames;
133   }
134 
135   /**
136    * Used to create a parameter to the HServerLoad constructor so that
137    * HServerLoad can provide information about the coprocessors loaded by this
138    * regionserver.
139    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
140    * to master).
141    */
142   public Set<String> getCoprocessors() {
143     Set<String> returnValue = new TreeSet<String>();
144     for(CoprocessorEnvironment e: coprocessors) {
145       returnValue.add(e.getInstance().getClass().getSimpleName());
146     }
147     return returnValue;
148   }
149 
150   /**
151    * Load system coprocessors. Read the class names from configuration.
152    * Called by constructor.
153    */
154   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
155     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
156       DEFAULT_COPROCESSORS_ENABLED);
157     if (!coprocessorsEnabled) {
158       return;
159     }
160 
161     Class<?> implClass = null;
162 
163     // load default coprocessors from configure file
164     String[] defaultCPClasses = conf.getStrings(confKey);
165     if (defaultCPClasses == null || defaultCPClasses.length == 0)
166       return;
167 
168     int priority = Coprocessor.PRIORITY_SYSTEM;
169     List<E> configured = new ArrayList<E>();
170     for (String className : defaultCPClasses) {
171       className = className.trim();
172       if (findCoprocessor(className) != null) {
173         continue;
174       }
175       ClassLoader cl = this.getClass().getClassLoader();
176       Thread.currentThread().setContextClassLoader(cl);
177       try {
178         implClass = cl.loadClass(className);
179         configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
180         LOG.info("System coprocessor " + className + " was loaded " +
181             "successfully with priority (" + priority++ + ").");
182       } catch (Throwable t) {
183         // We always abort if system coprocessors cannot be loaded
184         abortServer(className, t);
185       }
186     }
187 
188     // add entire set to the collection for COW efficiency
189     coprocessors.addAll(configured);
190   }
191 
192   /**
193    * Load a coprocessor implementation into the host
194    * @param path path to implementation jar
195    * @param className the main class name
196    * @param priority chaining priority
197    * @param conf configuration for coprocessor
198    * @throws java.io.IOException Exception
199    */
200   public E load(Path path, String className, int priority,
201       Configuration conf) throws IOException {
202     Class<?> implClass = null;
203     LOG.debug("Loading coprocessor class " + className + " with path " +
204         path + " and priority " + priority);
205 
206     ClassLoader cl = null;
207     if (path == null) {
208       try {
209         implClass = getClass().getClassLoader().loadClass(className);
210       } catch (ClassNotFoundException e) {
211         throw new IOException("No jar path specified for " + className);
212       }
213     } else {
214       cl = CoprocessorClassLoader.getClassLoader(
215         path, getClass().getClassLoader(), pathPrefix, conf);
216       try {
217         implClass = cl.loadClass(className);
218       } catch (ClassNotFoundException e) {
219         throw new IOException("Cannot load external coprocessor class " + className, e);
220       }
221     }
222 
223     //load custom code for coprocessor
224     Thread currentThread = Thread.currentThread();
225     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
226     try{
227       // switch temporarily to the thread classloader for custom CP
228       currentThread.setContextClassLoader(cl);
229       E cpInstance = loadInstance(implClass, priority, conf);
230       return cpInstance;
231     } finally {
232       // restore the fresh (host) classloader
233       currentThread.setContextClassLoader(hostClassLoader);
234     }
235   }
236 
237   /**
238    * @param implClass Implementation class
239    * @param priority priority
240    * @param conf configuration
241    * @throws java.io.IOException Exception
242    */
243   public void load(Class<?> implClass, int priority, Configuration conf)
244       throws IOException {
245     E env = loadInstance(implClass, priority, conf);
246     coprocessors.add(env);
247   }
248 
249   /**
250    * @param implClass Implementation class
251    * @param priority priority
252    * @param conf configuration
253    * @throws java.io.IOException Exception
254    */
255   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
256       throws IOException {
257     if (!Coprocessor.class.isAssignableFrom(implClass)) {
258       throw new IOException("Configured class " + implClass.getName() + " must implement "
259           + Coprocessor.class.getName() + " interface ");
260     }
261 
262     // create the instance
263     Coprocessor impl;
264     Object o = null;
265     try {
266       o = implClass.newInstance();
267       impl = (Coprocessor)o;
268     } catch (InstantiationException e) {
269       throw new IOException(e);
270     } catch (IllegalAccessException e) {
271       throw new IOException(e);
272     }
273     // create the environment
274     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
275     if (env instanceof Environment) {
276       ((Environment)env).startup();
277     }
278     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
279     // if server (master or regionserver) aborts.
280     coprocessorNames.add(implClass.getName());
281     return env;
282   }
283 
284   /**
285    * Called when a new Coprocessor class is loaded
286    */
287   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
288       int priority, int sequence, Configuration conf);
289 
290   public void shutdown(CoprocessorEnvironment e) {
291     if (e instanceof Environment) {
292       if (LOG.isDebugEnabled()) {
293         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
294       }
295       ((Environment)e).shutdown();
296     } else {
297       LOG.warn("Shutdown called on unknown environment: "+
298           e.getClass().getName());
299     }
300   }
301 
302   /**
303    * Find a coprocessor implementation by class name
304    * @param className the class name
305    * @return the coprocessor, or null if not found
306    */
307   public Coprocessor findCoprocessor(String className) {
308     for (E env: coprocessors) {
309       if (env.getInstance().getClass().getName().equals(className) ||
310           env.getInstance().getClass().getSimpleName().equals(className)) {
311         return env.getInstance();
312       }
313     }
314     return null;
315   }
316 
317   /**
318    * Find list of coprocessors that extend/implement the given class/interface
319    * @param cls the class/interface to look for
320    * @return the list of coprocessors, or null if not found
321    */
322   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
323     ArrayList<T> ret = new ArrayList<T>();
324 
325     for (E env: coprocessors) {
326       Coprocessor cp = env.getInstance();
327 
328       if(cp != null) {
329         if (cls.isAssignableFrom(cp.getClass())) {
330           ret.add((T)cp);
331         }
332       }
333     }
334     return ret;
335   }
336 
337   /**
338    * Find a coprocessor environment by class name
339    * @param className the class name
340    * @return the coprocessor, or null if not found
341    */
342   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
343     for (E env: coprocessors) {
344       if (env.getInstance().getClass().getName().equals(className) ||
345           env.getInstance().getClass().getSimpleName().equals(className)) {
346         return env;
347       }
348     }
349     return null;
350   }
351 
352   /**
353    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
354    * jar files.
355    * @return A set of ClassLoader instances
356    */
357   Set<ClassLoader> getExternalClassLoaders() {
358     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
359     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
360     for (E env : coprocessors) {
361       ClassLoader cl = env.getInstance().getClass().getClassLoader();
362       if (cl != systemClassLoader ){
363         //do not include system classloader
364         externalClassLoaders.add(cl);
365       }
366     }
367     return externalClassLoaders;
368   }
369 
370   /**
371    * Environment priority comparator.
372    * Coprocessors are chained in sorted order.
373    */
374   static class EnvironmentPriorityComparator
375       implements Comparator<CoprocessorEnvironment> {
376     public int compare(final CoprocessorEnvironment env1,
377         final CoprocessorEnvironment env2) {
378       if (env1.getPriority() < env2.getPriority()) {
379         return -1;
380       } else if (env1.getPriority() > env2.getPriority()) {
381         return 1;
382       }
383       if (env1.getLoadSequence() < env2.getLoadSequence()) {
384         return -1;
385       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
386         return 1;
387       }
388       return 0;
389     }
390   }
391 
392   /**
393    * Encapsulation of the environment of each coprocessor
394    */
395   public static class Environment implements CoprocessorEnvironment {
396 
397     /**
398      * A wrapper for HTable. Can be used to restrict privilege.
399      *
400      * Currently it just helps to track tables opened by a Coprocessor and
401      * facilitate close of them if it is aborted.
402      *
403      * We also disallow row locking.
404      *
405      * There is nothing now that will stop a coprocessor from using HTable
406      * objects directly instead of this API, but in the future we intend to
407      * analyze coprocessor implementations as they are loaded and reject those
408      * which attempt to use objects and methods outside the Environment
409      * sandbox.
410      */
411     class HTableWrapper implements HTableInterface {
412 
413       private TableName tableName;
414       private HTable table;
415       private HConnection connection;
416 
417       public HTableWrapper(TableName tableName, HConnection connection, ExecutorService pool)
418           throws IOException {
419         this.tableName = tableName;
420         this.table = new HTable(tableName, connection, pool);
421         this.connection = connection;
422         openTables.add(this);
423       }
424 
425       void internalClose() throws IOException {
426         List<IOException> exceptions = new ArrayList<IOException>(2);
427         try {
428         table.close();
429         } catch (IOException e) {
430           exceptions.add(e);
431         }
432         try {
433           // have to self-manage our connection, as per the HTable contract
434           if (this.connection != null) {
435             this.connection.close();
436           }
437         } catch (IOException e) {
438           exceptions.add(e);
439         }
440         if (!exceptions.isEmpty()) {
441           throw MultipleIOException.createIOException(exceptions);
442         }
443       }
444 
445       public Configuration getConfiguration() {
446         return table.getConfiguration();
447       }
448 
449       public void close() throws IOException {
450         try {
451           internalClose();
452         } finally {
453           openTables.remove(this);
454         }
455       }
456 
457       public Result getRowOrBefore(byte[] row, byte[] family)
458           throws IOException {
459         return table.getRowOrBefore(row, family);
460       }
461 
462       public Result get(Get get) throws IOException {
463         return table.get(get);
464       }
465 
466       public boolean exists(Get get) throws IOException {
467         return table.exists(get);
468       }
469 
470       public Boolean[] exists(List<Get> gets) throws IOException{
471         return table.exists(gets);
472       }
473 
474       public void put(Put put) throws IOException {
475         table.put(put);
476       }
477 
478       public void put(List<Put> puts) throws IOException {
479         table.put(puts);
480       }
481 
482       public void delete(Delete delete) throws IOException {
483         table.delete(delete);
484       }
485 
486       public void delete(List<Delete> deletes) throws IOException {
487         table.delete(deletes);
488       }
489 
490       public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
491           byte[] value, Put put) throws IOException {
492         return table.checkAndPut(row, family, qualifier, value, put);
493       }
494 
495       public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
496           byte[] value, Delete delete) throws IOException {
497         return table.checkAndDelete(row, family, qualifier, value, delete);
498       }
499 
500       public long incrementColumnValue(byte[] row, byte[] family,
501           byte[] qualifier, long amount) throws IOException {
502         return table.incrementColumnValue(row, family, qualifier, amount);
503       }
504 
505       public long incrementColumnValue(byte[] row, byte[] family,
506           byte[] qualifier, long amount, Durability durability)
507           throws IOException {
508         return table.incrementColumnValue(row, family, qualifier, amount,
509             durability);
510       }
511 
512       @Override
513       public Result append(Append append) throws IOException {
514         return table.append(append);
515       }
516 
517       @Override
518       public Result increment(Increment increment) throws IOException {
519         return table.increment(increment);
520       }
521 
522       public void flushCommits() throws IOException {
523         table.flushCommits();
524       }
525 
526       public boolean isAutoFlush() {
527         return table.isAutoFlush();
528       }
529 
530       public ResultScanner getScanner(Scan scan) throws IOException {
531         return table.getScanner(scan);
532       }
533 
534       public ResultScanner getScanner(byte[] family) throws IOException {
535         return table.getScanner(family);
536       }
537 
538       public ResultScanner getScanner(byte[] family, byte[] qualifier)
539           throws IOException {
540         return table.getScanner(family, qualifier);
541       }
542 
543       public HTableDescriptor getTableDescriptor() throws IOException {
544         return table.getTableDescriptor();
545       }
546 
547       @Override
548       public byte[] getTableName() {
549         return tableName.getName();
550       }
551 
552       @Override
553       public TableName getName() {
554         return table.getName();
555       }
556 
557       @Override
558       public void batch(List<? extends Row> actions, Object[] results)
559           throws IOException, InterruptedException {
560         table.batch(actions, results);
561       }
562 
563       /**
564        * {@inheritDoc}
565        * @deprecated If any exception is thrown by one of the actions, there is no way to
566        * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
567        */
568       @Override
569       public Object[] batch(List<? extends Row> actions)
570           throws IOException, InterruptedException {
571         return table.batch(actions);
572       }
573 
574       @Override
575       public <R> void batchCallback(List<? extends Row> actions, Object[] results,
576           Batch.Callback<R> callback) throws IOException, InterruptedException {
577         table.batchCallback(actions, results, callback);
578       }
579 
580       /**
581        * {@inheritDoc}
582        * @deprecated If any exception is thrown by one of the actions, there is no way to
583        * retrieve the partially executed results. Use 
584        * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
585        * instead.
586        */
587       @Override
588       public <R> Object[] batchCallback(List<? extends Row> actions,
589           Batch.Callback<R> callback) throws IOException, InterruptedException {
590         return table.batchCallback(actions, callback);
591       }
592 
593       @Override
594       public Result[] get(List<Get> gets) throws IOException {
595         return table.get(gets);
596       }
597 
598       @Override
599       public CoprocessorRpcChannel coprocessorService(byte[] row) {
600         return table.coprocessorService(row);
601       }
602 
603       @Override
604       public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
605           byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
606           throws ServiceException, Throwable {
607         return table.coprocessorService(service, startKey, endKey, callable);
608       }
609 
610       @Override
611       public <T extends Service, R> void coprocessorService(Class<T> service,
612           byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
613           throws ServiceException, Throwable {
614         table.coprocessorService(service, startKey, endKey, callable, callback);
615       }
616 
617       @Override
618       public void mutateRow(RowMutations rm) throws IOException {
619         table.mutateRow(rm);
620       }
621 
622       @Override
623       public void setAutoFlush(boolean autoFlush) {
624         table.setAutoFlush(autoFlush, autoFlush);
625       }
626 
627       @Override
628       public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
629         table.setAutoFlush(autoFlush, clearBufferOnFail);
630       }
631 
632       @Override
633       public void setAutoFlushTo(boolean autoFlush) {
634         table.setAutoFlushTo(autoFlush);
635       }
636 
637       @Override
638       public long getWriteBufferSize() {
639          return table.getWriteBufferSize();
640       }
641 
642       @Override
643       public void setWriteBufferSize(long writeBufferSize) throws IOException {
644         table.setWriteBufferSize(writeBufferSize);
645       }
646 
647       @Override
648       public long incrementColumnValue(byte[] row, byte[] family,
649           byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
650         return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
651       }
652 
653       @Override
654       public <R extends Message> Map<byte[], R> batchCoprocessorService(
655           Descriptors.MethodDescriptor method, Message request, byte[] startKey,
656           byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
657         return table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype);
658       }
659 
660       @Override
661       public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor method,
662           Message request, byte[] startKey, byte[] endKey, R responsePrototype,
663           Callback<R> callback) throws ServiceException, Throwable {
664         table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype,
665             callback);
666       }
667 
668       @Override
669       public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
670           CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
671         return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
672       }
673     }
674 
675     /** The coprocessor */
676     public Coprocessor impl;
677     /** Chaining priority */
678     protected int priority = Coprocessor.PRIORITY_USER;
679     /** Current coprocessor state */
680     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
681     /** Accounting for tables opened by the coprocessor */
682     protected List<HTableInterface> openTables =
683       Collections.synchronizedList(new ArrayList<HTableInterface>());
684     private int seq;
685     private Configuration conf;
686     private ClassLoader classLoader;
687 
688     /**
689      * Constructor
690      * @param impl the coprocessor instance
691      * @param priority chaining priority
692      */
693     public Environment(final Coprocessor impl, final int priority,
694         final int seq, final Configuration conf) {
695       this.impl = impl;
696       this.classLoader = impl.getClass().getClassLoader();
697       this.priority = priority;
698       this.state = Coprocessor.State.INSTALLED;
699       this.seq = seq;
700       this.conf = conf;
701     }
702 
703     /** Initialize the environment */
704     public void startup() throws IOException {
705       if (state == Coprocessor.State.INSTALLED ||
706           state == Coprocessor.State.STOPPED) {
707         state = Coprocessor.State.STARTING;
708         Thread currentThread = Thread.currentThread();
709         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
710         try {
711           currentThread.setContextClassLoader(this.getClassLoader());
712           impl.start(this);
713           state = Coprocessor.State.ACTIVE;
714         } finally {
715           currentThread.setContextClassLoader(hostClassLoader);
716         }
717       } else {
718         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
719             " because not inactive (state="+state.toString()+")");
720       }
721     }
722 
723     /** Clean up the environment */
724     protected void shutdown() {
725       if (state == Coprocessor.State.ACTIVE) {
726         state = Coprocessor.State.STOPPING;
727         Thread currentThread = Thread.currentThread();
728         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
729         try {
730           currentThread.setContextClassLoader(this.getClassLoader());
731           impl.stop(this);
732           state = Coprocessor.State.STOPPED;
733         } catch (IOException ioe) {
734           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
735         } finally {
736           currentThread.setContextClassLoader(hostClassLoader);
737         }
738       } else {
739         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
740             " because not active (state="+state.toString()+")");
741       }
742       // clean up any table references
743       for (HTableInterface table: openTables) {
744         try {
745           ((HTableWrapper)table).internalClose();
746         } catch (IOException e) {
747           // nothing can be done here
748           LOG.warn("Failed to close " +
749               Bytes.toStringBinary(table.getTableName()), e);
750         }
751       }
752     }
753 
754     @Override
755     public Coprocessor getInstance() {
756       return impl;
757     }
758 
759     @Override
760     public ClassLoader getClassLoader() {
761       return classLoader;
762     }
763 
764     @Override
765     public int getPriority() {
766       return priority;
767     }
768 
769     @Override
770     public int getLoadSequence() {
771       return seq;
772     }
773 
774     /** @return the coprocessor environment version */
775     @Override
776     public int getVersion() {
777       return Coprocessor.VERSION;
778     }
779 
780     /** @return the HBase release */
781     @Override
782     public String getHBaseVersion() {
783       return VersionInfo.getVersion();
784     }
785 
786     @Override
787     public Configuration getConfiguration() {
788       return conf;
789     }
790 
791     /**
792      * Open a table from within the Coprocessor environment
793      * @param tableName the table name
794      * @return an interface for manipulating the table
795      * @exception java.io.IOException Exception
796      */
797     @Override
798     public HTableInterface getTable(TableName tableName) throws IOException {
799       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
800     }
801 
802     /**
803      * Open a table from within the Coprocessor environment
804      * @param tableName the table name
805      * @return an interface for manipulating the table
806      * @exception java.io.IOException Exception
807      */
808     @Override
809     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
810       return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this),
811           pool);
812     }
813   }
814 
815   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
816     abortServer(environment.getInstance().getClass().getName(), e);
817   }
818 
819   protected void abortServer(final String coprocessorName, final Throwable e) {
820     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
821     LOG.error(message, e);
822     if (abortable != null) {
823       abortable.abort(message, e);
824     } else {
825       LOG.warn("No available Abortable, process was not aborted");
826     }
827   }
828 
829   /**
830    * This is used by coprocessor hooks which are declared to throw IOException
831    * (or its subtypes). For such hooks, we should handle throwable objects
832    * depending on the Throwable's type. Those which are instances of
833    * IOException should be passed on to the client. This is in conformance with
834    * the HBase idiom regarding IOException: that it represents a circumstance
835    * that should be passed along to the client for its own handling. For
836    * example, a coprocessor that implements access controls would throw a
837    * subclass of IOException, such as AccessDeniedException, in its preGet()
838    * method to prevent an unauthorized client's performing a Get on a particular
839    * table.
840    * @param env Coprocessor Environment
841    * @param e Throwable object thrown by coprocessor.
842    * @exception IOException Exception
843    */
844   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
845       throws IOException {
846     if (e instanceof IOException) {
847       throw (IOException)e;
848     }
849     // If we got here, e is not an IOException. A loaded coprocessor has a
850     // fatal bug, and the server (master or regionserver) should remove the
851     // faulty coprocessor from its set of active coprocessors. Setting
852     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
853     // which may be useful in development and testing environments where
854     // 'failing fast' for error analysis is desired.
855     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
856       // server is configured to abort.
857       abortServer(env, e);
858     } else {
859       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
860           "environment because it threw:  " + e,e);
861       coprocessors.remove(env);
862       try {
863         shutdown(env);
864       } catch (Exception x) {
865         LOG.error("Uncaught exception when shutting down coprocessor '"
866             + env.toString() + "'", x);
867       }
868       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
869           "' threw: '" + e + "' and has been removed from the active " +
870           "coprocessor set.", e);
871     }
872   }
873 }