View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.CoprocessorEnvironment;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.Type;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.catalog.MetaReader;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Query;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.Scan;
63  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
70  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
73  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
74  import org.apache.hadoop.hbase.filter.CompareFilter;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.FilterList;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RpcServer;
79  import org.apache.hadoop.hbase.master.MasterServices;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
82  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
88  import org.apache.hadoop.hbase.regionserver.HRegion;
89  import org.apache.hadoop.hbase.regionserver.InternalScanner;
90  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
91  import org.apache.hadoop.hbase.regionserver.RegionScanner;
92  import org.apache.hadoop.hbase.regionserver.ScanType;
93  import org.apache.hadoop.hbase.regionserver.Store;
94  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
95  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
96  import org.apache.hadoop.hbase.security.AccessDeniedException;
97  import org.apache.hadoop.hbase.security.User;
98  import org.apache.hadoop.hbase.security.UserProvider;
99  import org.apache.hadoop.hbase.security.access.Permission.Action;
100 import org.apache.hadoop.hbase.util.ByteRange;
101 import org.apache.hadoop.hbase.util.Bytes;
102 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
103 import org.apache.hadoop.hbase.util.Pair;
104 import org.apache.hadoop.hbase.util.SimpleByteRange;
105 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
106 
107 import com.google.common.collect.ArrayListMultimap;
108 import com.google.common.collect.ImmutableSet;
109 import com.google.common.collect.ListMultimap;
110 import com.google.common.collect.Lists;
111 import com.google.common.collect.MapMaker;
112 import com.google.common.collect.Maps;
113 import com.google.common.collect.Sets;
114 import com.google.protobuf.Message;
115 import com.google.protobuf.RpcCallback;
116 import com.google.protobuf.RpcController;
117 import com.google.protobuf.Service;
118 
119 /**
120  * Provides basic authorization checks for data access and administrative
121  * operations.
122  *
123  * <p>
124  * {@code AccessController} performs authorization checks for HBase operations
125  * based on:
126  * <ul>
127  *   <li>the identity of the user performing the operation</li>
128  *   <li>the scope over which the operation is performed, in increasing
129  *   specificity: global, table, column family, or qualifier</li>
130  *   <li>the type of action being performed (as mapped to
131  *   {@link Permission.Action} values)</li>
132  * </ul>
133  * If the authorization check fails, an {@link AccessDeniedException}
134  * will be thrown for the operation.
135  * </p>
136  *
137  * <p>
138  * To perform authorization checks, {@code AccessController} relies on the
139  * {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} being loaded to provide
140  * the user identities for remote requests.
141  * </p>
142  *
143  * <p>
144  * The access control lists used for authorization can be manipulated via the
145  * exposed {@link AccessControlService} Interface implementation, and the associated
146  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
147  * commands.
148  * </p>
149  */
150 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
151 public class AccessController extends BaseMasterAndRegionObserver
152     implements RegionServerObserver,
153       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
154 
155   public static final Log LOG = LogFactory.getLog(AccessController.class);
156 
157   private static final Log AUDITLOG =
158     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
159   private static final String CHECK_COVERING_PERM = "check_covering_perm";
160   private static final String TAG_CHECK_PASSED = "tag_check_passed";
161   private static final byte[] TRUE = Bytes.toBytes(true);
162 
163   TableAuthManager authManager = null;
164 
165   /** flags if we are running on a region of the _acl_ table */
166   boolean aclRegion = false;
167 
168   /** defined only for Endpoint implementation, so it can have way to
169    access region services */
170   private RegionCoprocessorEnvironment regionEnv;
171 
172   /** Mapping of scanner instances to the user who created them */
173   private Map<InternalScanner,String> scannerOwners =
174       new MapMaker().weakKeys().makeMap();
175 
176   private Map<TableName, List<UserPermission>> tableAcls;
177 
178   /** Provider for mapping principal names to Users */
179   private UserProvider userProvider;
180 
181   /** The list of users with superuser authority */
182   private List<String> superusers;
183 
184   /** if we are active, usually true, only not true if "hbase.security.authorization"
185    has been set to false in site configuration */
186   boolean authorizationEnabled;
187 
188   /** if we are able to support cell ACLs */
189   boolean cellFeaturesEnabled;
190 
191   /** if we should check EXEC permissions */
192   boolean shouldCheckExecPermission;
193 
194   /** if we should terminate access checks early as soon as table or CF grants
195     allow access; pre-0.98 compatible behavior */
196   boolean compatibleEarlyTermination;
197 
198   /** if we have been successfully initialized */
199   private volatile boolean initialized = false;
200 
201   /** if the ACL table is available, only relevant in the master */
202   private volatile boolean aclTabAvailable = false;
203 
204   public HRegion getRegion() {
205     return regionEnv != null ? regionEnv.getRegion() : null;
206   }
207 
208   public TableAuthManager getAuthManager() {
209     return authManager;
210   }
211 
212   void initialize(RegionCoprocessorEnvironment e) throws IOException {
213     final HRegion region = e.getRegion();
214     Configuration conf = e.getConfiguration();
215     Map<byte[], ListMultimap<String,TablePermission>> tables =
216         AccessControlLists.loadAll(region);
217     // For each table, write out the table's permissions to the respective
218     // znode for that table.
219     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
220       tables.entrySet()) {
221       byte[] entry = t.getKey();
222       ListMultimap<String,TablePermission> perms = t.getValue();
223       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
224       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
225     }
226     initialized = true;
227   }
228 
229   /**
230    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
231    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
232    * table updates.
233    */
234   void updateACL(RegionCoprocessorEnvironment e,
235       final Map<byte[], List<Cell>> familyMap) {
236     Set<byte[]> entries =
237         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
238     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
239       List<Cell> cells = f.getValue();
240       for (Cell cell: cells) {
241         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
242         if (Bytes.equals(kv.getBuffer(), kv.getFamilyOffset(),
243             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
244             AccessControlLists.ACL_LIST_FAMILY.length)) {
245           entries.add(kv.getRow());
246         }
247       }
248     }
249     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
250     Configuration conf = regionEnv.getConfiguration();
251     for (byte[] entry: entries) {
252       try {
253         ListMultimap<String,TablePermission> perms =
254           AccessControlLists.getPermissions(conf, entry);
255         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
256         zkw.writeToZookeeper(entry, serialized);
257       } catch (IOException ex) {
258         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
259             ex);
260       }
261     }
262   }
263 
264   /**
265    * Check the current user for authorization to perform a specific action
266    * against the given set of row data.
267    *
268    * <p>Note: Ordering of the authorization checks
269    * has been carefully optimized to short-circuit the most common requests
270    * and minimize the amount of processing required.</p>
271    *
272    * @param permRequest the action being requested
273    * @param e the coprocessor environment
274    * @param families the map of column families to qualifiers present in
275    * the request
276    * @return an authorization result
277    */
278   AuthResult permissionGranted(String request, User user, Action permRequest,
279       RegionCoprocessorEnvironment e,
280       Map<byte [], ? extends Collection<?>> families) {
281     HRegionInfo hri = e.getRegion().getRegionInfo();
282     TableName tableName = hri.getTable();
283 
284     // 1. All users need read access to hbase:meta table.
285     // this is a very common operation, so deal with it quickly.
286     if (hri.isMetaRegion()) {
287       if (permRequest == Action.READ) {
288         return AuthResult.allow(request, "All users allowed", user,
289           permRequest, tableName, families);
290       }
291     }
292 
293     if (user == null) {
294       return AuthResult.deny(request, "No user associated with request!", null,
295         permRequest, tableName, families);
296     }
297 
298     // 2. check for the table-level, if successful we can short-circuit
299     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
300       return AuthResult.allow(request, "Table permission granted", user,
301         permRequest, tableName, families);
302     }
303 
304     // 3. check permissions against the requested families
305     if (families != null && families.size() > 0) {
306       // all families must pass
307       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
308         // a) check for family level access
309         if (authManager.authorize(user, tableName, family.getKey(),
310             permRequest)) {
311           continue;  // family-level permission overrides per-qualifier
312         }
313 
314         // b) qualifier level access can still succeed
315         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
316           if (family.getValue() instanceof Set) {
317             // for each qualifier of the family
318             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
319             for (byte[] qualifier : familySet) {
320               if (!authManager.authorize(user, tableName, family.getKey(),
321                                          qualifier, permRequest)) {
322                 return AuthResult.deny(request, "Failed qualifier check", user,
323                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
324               }
325             }
326           } else if (family.getValue() instanceof List) { // List<KeyValue>
327             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
328             for (KeyValue kv : kvList) {
329               if (!authManager.authorize(user, tableName, family.getKey(),
330                       kv.getQualifier(), permRequest)) {
331                 return AuthResult.deny(request, "Failed qualifier check", user,
332                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
333               }
334             }
335           }
336         } else {
337           // no qualifiers and family-level check already failed
338           return AuthResult.deny(request, "Failed family check", user, permRequest,
339               tableName, makeFamilyMap(family.getKey(), null));
340         }
341       }
342 
343       // all family checks passed
344       return AuthResult.allow(request, "All family checks passed", user, permRequest,
345           tableName, families);
346     }
347 
348     // 4. no families to check and table level access failed
349     return AuthResult.deny(request, "No families to check and table permission failed",
350         user, permRequest, tableName, families);
351   }
352 
353   /**
354    * Check the current user for authorization to perform a specific action
355    * against the given set of row data.
356    * @param opType the operation type
357    * @param user the user
358    * @param e the coprocessor environment
359    * @param families the map of column families to qualifiers present in
360    * the request
361    * @param actions the desired actions
362    * @return an authorization result
363    */
364   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
365       Map<byte [], ? extends Collection<?>> families, Action... actions) {
366     AuthResult result = null;
367     for (Action action: actions) {
368       result = permissionGranted(opType.toString(), user, action, e, families);
369       if (!result.isAllowed()) {
370         return result;
371       }
372     }
373     return result;
374   }
375 
376   private void logResult(AuthResult result) {
377     if (AUDITLOG.isTraceEnabled()) {
378       InetAddress remoteAddr = RpcServer.getRemoteAddress();
379       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
380           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
381           "; reason: " + result.getReason() +
382           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
383           "; request: " + result.getRequest() +
384           "; context: " + result.toContextString());
385     }
386   }
387 
388   /**
389    * Returns the active user to which authorization checks should be applied.
390    * If we are in the context of an RPC call, the remote user is used,
391    * otherwise the currently logged in user is used.
392    */
393   private User getActiveUser() throws IOException {
394     User user = RpcServer.getRequestUser();
395     if (user == null) {
396       // for non-rpc handling, fallback to system user
397       user = userProvider.getCurrent();
398     }
399     return user;
400   }
401 
402   /**
403    * Authorizes that the current user has any of the given permissions for the
404    * given table, column family and column qualifier.
405    * @param tableName Table requested
406    * @param family Column family requested
407    * @param qualifier Column qualifier requested
408    * @throws IOException if obtaining the current user fails
409    * @throws AccessDeniedException if user has no authorization
410    */
411   private void requirePermission(String request, TableName tableName, byte[] family,
412       byte[] qualifier, Action... permissions) throws IOException {
413     User user = getActiveUser();
414     AuthResult result = null;
415 
416     for (Action permission : permissions) {
417       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
418         result = AuthResult.allow(request, "Table permission granted", user,
419                                   permission, tableName, family, qualifier);
420         break;
421       } else {
422         // rest of the world
423         result = AuthResult.deny(request, "Insufficient permissions", user,
424                                  permission, tableName, family, qualifier);
425       }
426     }
427     logResult(result);
428     if (authorizationEnabled && !result.isAllowed()) {
429       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
430     }
431   }
432 
433   /**
434    * Authorizes that the current user has any of the given permissions for the
435    * given table, column family and column qualifier.
436    * @param tableName Table requested
437    * @param family Column family param
438    * @param qualifier Column qualifier param
439    * @throws IOException if obtaining the current user fails
440    * @throws AccessDeniedException if user has no authorization
441    */
442   private void requireTablePermission(String request, TableName tableName, byte[] family,
443       byte[] qualifier, Action... permissions) throws IOException {
444     User user = getActiveUser();
445     AuthResult result = null;
446 
447     for (Action permission : permissions) {
448       if (authManager.authorize(user, tableName, null, null, permission)) {
449         result = AuthResult.allow(request, "Table permission granted", user,
450             permission, tableName, null, null);
451         break;
452       } else {
453         // rest of the world
454         result = AuthResult.deny(request, "Insufficient permissions", user,
455             permission, tableName, family, qualifier);
456       }
457     }
458     logResult(result);
459     if (authorizationEnabled && !result.isAllowed()) {
460       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
461     }
462   }
463 
464   /**
465    * Authorizes that the current user has any of the given permissions to access the table.
466    *
467    * @param tableName Table requested
468    * @param permissions Actions being requested
469    * @throws IOException if obtaining the current user fails
470    * @throws AccessDeniedException if user has no authorization
471    */
472   private void requireAccess(String request, TableName tableName,
473       Action... permissions) throws IOException {
474     User user = getActiveUser();
475     AuthResult result = null;
476 
477     for (Action permission : permissions) {
478       if (authManager.hasAccess(user, tableName, permission)) {
479         result = AuthResult.allow(request, "Table permission granted", user,
480                                   permission, tableName, null, null);
481         break;
482       } else {
483         // rest of the world
484         result = AuthResult.deny(request, "Insufficient permissions", user,
485                                  permission, tableName, null, null);
486       }
487     }
488     logResult(result);
489     if (authorizationEnabled && !result.isAllowed()) {
490       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
491     }
492   }
493 
494   /**
495    * Authorizes that the current user has global privileges for the given action.
496    * @param perm The action being requested
497    * @throws IOException if obtaining the current user fails
498    * @throws AccessDeniedException if authorization is denied
499    */
500   private void requirePermission(String request, Action perm) throws IOException {
501     requireGlobalPermission(request, perm, null, null);
502   }
503 
504   /**
505    * Checks that the user has the given global permission. The generated
506    * audit log message will contain context information for the operation
507    * being authorized, based on the given parameters.
508    * @param perm Action being requested
509    * @param tableName Affected table name.
510    * @param familyMap Affected column families.
511    */
512   private void requireGlobalPermission(String request, Action perm, TableName tableName,
513       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
514     User user = getActiveUser();
515     if (authManager.authorize(user, perm) || (tableName != null &&
516         authManager.authorize(user, tableName.getNamespaceAsString(), perm))) {
517       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
518     } else {
519       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
520       if (authorizationEnabled) {
521         throw new AccessDeniedException("Insufficient permissions for user '" +
522           (user != null ? user.getShortName() : "null") +"' (global, action=" +
523           perm.toString() + ")");
524       }
525     }
526   }
527 
528   /**
529    * Checks that the user has the given global permission. The generated
530    * audit log message will contain context information for the operation
531    * being authorized, based on the given parameters.
532    * @param perm Action being requested
533    * @param namespace
534    */
535   private void requireGlobalPermission(String request, Action perm,
536                                        String namespace) throws IOException {
537     User user = getActiveUser();
538     if (authManager.authorize(user, perm)
539         || (namespace != null && authManager.authorize(user, namespace, perm))) {
540       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
541     } else {
542       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
543       if (authorizationEnabled) {
544         throw new AccessDeniedException("Insufficient permissions for user '" +
545           (user != null ? user.getShortName() : "null") +"' (global, action=" +
546           perm.toString() + ")");
547       }
548     }
549   }
550 
551   /**
552    * Checks that the user has the given global or namespace permission.
553    * @param namespace
554    * @param permissions Actions being requested
555    */
556   public void requireNamespacePermission(String request, String namespace,
557       Action... permissions) throws IOException {
558     User user = getActiveUser();
559     AuthResult result = null;
560 
561     for (Action permission : permissions) {
562       if (authManager.authorize(user, namespace, permission)) {
563         result = AuthResult.allow(request, "Namespace permission granted",
564             user, permission, namespace);
565         break;
566       } else {
567         // rest of the world
568         result = AuthResult.deny(request, "Insufficient permissions", user,
569             permission, namespace);
570       }
571     }
572     logResult(result);
573     if (authorizationEnabled && !result.isAllowed()) {
574       throw new AccessDeniedException("Insufficient permissions "
575           + result.toContextString());
576     }
577   }
578 
579   /**
580    * Checks that the user has the given global or namespace permission.
581    * @param namespace
582    * @param permissions Actions being requested
583    */
584   public void requireNamespacePermission(String request, String namespace, TableName tableName,
585       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
586       throws IOException {
587     User user = getActiveUser();
588     AuthResult result = null;
589 
590     for (Action permission : permissions) {
591       if (authManager.authorize(user, namespace, permission)) {
592         result = AuthResult.allow(request, "Namespace permission granted",
593             user, permission, namespace);
594         break;
595       } else {
596         // rest of the world
597         result = AuthResult.deny(request, "Insufficient permissions", user,
598             permission, namespace);
599       }
600     }
601     logResult(result);
602     if (authorizationEnabled && !result.isAllowed()) {
603       throw new AccessDeniedException("Insufficient permissions "
604           + result.toContextString());
605     }
606   }
607 
608   /**
609    * Returns <code>true</code> if the current user is allowed the given action
610    * over at least one of the column qualifiers in the given column families.
611    */
612   private boolean hasFamilyQualifierPermission(User user,
613       Action perm,
614       RegionCoprocessorEnvironment env,
615       Map<byte[], ? extends Collection<byte[]>> familyMap)
616     throws IOException {
617     HRegionInfo hri = env.getRegion().getRegionInfo();
618     TableName tableName = hri.getTable();
619 
620     if (user == null) {
621       return false;
622     }
623 
624     if (familyMap != null && familyMap.size() > 0) {
625       // at least one family must be allowed
626       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
627           familyMap.entrySet()) {
628         if (family.getValue() != null && !family.getValue().isEmpty()) {
629           for (byte[] qualifier : family.getValue()) {
630             if (authManager.matchPermission(user, tableName,
631                 family.getKey(), qualifier, perm)) {
632               return true;
633             }
634           }
635         } else {
636           if (authManager.matchPermission(user, tableName, family.getKey(),
637               perm)) {
638             return true;
639           }
640         }
641       }
642     } else if (LOG.isDebugEnabled()) {
643       LOG.debug("Empty family map passed for permission check");
644     }
645 
646     return false;
647   }
648 
649   private enum OpType {
650     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
651     GET("get"),
652     EXISTS("exists"),
653     SCAN("scan"),
654     PUT("put"),
655     DELETE("delete"),
656     CHECK_AND_PUT("checkAndPut"),
657     CHECK_AND_DELETE("checkAndDelete"),
658     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
659     APPEND("append"),
660     INCREMENT("increment");
661 
662     private String type;
663 
664     private OpType(String type) {
665       this.type = type;
666     }
667 
668     @Override
669     public String toString() {
670       return type;
671     }
672   }
673 
674   /**
675    * Determine if cell ACLs covered by the operation grant access. This is expensive.
676    * @return false if cell ACLs failed to grant access, true otherwise
677    * @throws IOException
678    */
679   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
680       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
681       throws IOException {
682     if (!cellFeaturesEnabled) {
683       return false;
684     }
685     long cellGrants = 0;
686     User user = getActiveUser();
687     long latestCellTs = 0;
688     Get get = new Get(row);
689     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
690     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
691     // version. We have to get every cell version and check its TS against the TS asked for in
692     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
693     // consider only one such passing cell. In case of Delete we have to consider all the cell
694     // versions under this passing version. When Delete Mutation contains columns which are a
695     // version delete just consider only one version for those column cells.
696     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
697     if (considerCellTs) {
698       get.setMaxVersions();
699     } else {
700       get.setMaxVersions(1);
701     }
702     boolean diffCellTsFromOpTs = false;
703     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
704       byte[] col = entry.getKey();
705       // TODO: HBASE-7114 could possibly unify the collection type in family
706       // maps so we would not need to do this
707       if (entry.getValue() instanceof Set) {
708         Set<byte[]> set = (Set<byte[]>)entry.getValue();
709         if (set == null || set.isEmpty()) {
710           get.addFamily(col);
711         } else {
712           for (byte[] qual: set) {
713             get.addColumn(col, qual);
714           }
715         }
716       } else if (entry.getValue() instanceof List) {
717         List<Cell> list = (List<Cell>)entry.getValue();
718         if (list == null || list.isEmpty()) {
719           get.addFamily(col);
720         } else {
721           // In case of family delete, a Cell will be added into the list with Qualifier as null.
722           for (Cell cell : list) {
723             if (cell.getQualifierLength() == 0
724                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
725                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
726               get.addFamily(col);
727             } else {
728               get.addColumn(col, CellUtil.cloneQualifier(cell));
729             }
730             if (considerCellTs) {
731               long cellTs = cell.getTimestamp();
732               latestCellTs = Math.max(latestCellTs, cellTs);
733               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
734             }
735           }
736         }
737       } else if (entry.getValue() == null) {
738         get.addFamily(col);        
739       } else {
740         throw new RuntimeException("Unhandled collection type " +
741           entry.getValue().getClass().getName());
742       }
743     }
744     // We want to avoid looking into the future. So, if the cells of the
745     // operation specify a timestamp, or the operation itself specifies a
746     // timestamp, then we use the maximum ts found. Otherwise, we bound
747     // the Get to the current server time. We add 1 to the timerange since
748     // the upper bound of a timerange is exclusive yet we need to examine
749     // any cells found there inclusively.
750     long latestTs = Math.max(opTs, latestCellTs);
751     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
752       latestTs = EnvironmentEdgeManager.currentTimeMillis();
753     }
754     get.setTimeRange(0, latestTs + 1);
755     // In case of Put operation we set to read all versions. This was done to consider the case
756     // where columns are added with TS other than the Mutation TS. But normally this wont be the
757     // case with Put. There no need to get all versions but get latest version only.
758     if (!diffCellTsFromOpTs && request == OpType.PUT) {
759       get.setMaxVersions(1);
760     }
761     if (LOG.isTraceEnabled()) {
762       LOG.trace("Scanning for cells with " + get);
763     }
764     // This Map is identical to familyMap. The key is a BR rather than byte[].
765     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
766     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
767     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
768     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
769       if (entry.getValue() instanceof List) {
770         familyMap1.put(new SimpleByteRange(entry.getKey()), (List<Cell>) entry.getValue());
771       }
772     }
773     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
774     List<Cell> cells = Lists.newArrayList();
775     Cell prevCell = null;
776     ByteRange curFam = new SimpleByteRange();
777     boolean curColAllVersions = (request == OpType.DELETE);
778     long curColCheckTs = opTs;
779     boolean foundColumn = false;
780     try {
781       boolean more = false;
782       do {
783         cells.clear();
784         // scan with limit as 1 to hold down memory use on wide rows
785         more = scanner.next(cells, 1);
786         for (Cell cell: cells) {
787           if (LOG.isTraceEnabled()) {
788             LOG.trace("Found cell " + cell);
789           }
790           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
791           if (colChange) foundColumn = false;
792           prevCell = cell;
793           if (!curColAllVersions && foundColumn) {
794             continue;
795           }
796           if (colChange && considerCellTs) {
797             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
798             List<Cell> cols = familyMap1.get(curFam);
799             for (Cell col : cols) {
800               // null/empty qualifier is used to denote a Family delete. The TS and delete type
801               // associated with this is applicable for all columns within the family. That is
802               // why the below (col.getQualifierLength() == 0) check.
803               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
804                   || CellUtil.matchingQualifier(cell, col)) {
805                 byte type = col.getTypeByte();
806                 if (considerCellTs) {
807                   curColCheckTs = col.getTimestamp();
808                 }
809                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
810                 // a version delete for a column no need to check all the covering cells within
811                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
812                 // One version delete types are Delete/DeleteFamilyVersion
813                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
814                     || (KeyValue.Type.DeleteFamily.getCode() == type);
815                 break;
816               }
817             }
818           }
819           if (cell.getTimestamp() > curColCheckTs) {
820             // Just ignore this cell. This is not a covering cell.
821             continue;
822           }
823           foundColumn = true;
824           for (Action action: actions) {
825             // Are there permissions for this user for the cell?
826             if (!authManager.authorize(user, getTableName(e), cell, action)) {
827               // We can stop if the cell ACL denies access
828               return false;
829             }
830           }
831           cellGrants++;
832         }
833       } while (more);
834     } catch (AccessDeniedException ex) {
835       throw ex;
836     } catch (IOException ex) {
837       LOG.error("Exception while getting cells to calculate covering permission", ex);
838     } finally {
839       scanner.close();
840     }
841     // We should not authorize unless we have found one or more cell ACLs that
842     // grant access. This code is used to check for additional permissions
843     // after no table or CF grants are found.
844     return cellGrants > 0;
845   }
846 
847   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
848     // Iterate over the entries in the familyMap, replacing the cells therein
849     // with new cells including the ACL data
850     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
851       List<Cell> newCells = Lists.newArrayList();
852       for (Cell cell: e.getValue()) {
853         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
854         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
855         if (cell.getTagsLengthUnsigned() > 0) {
856           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
857             cell.getTagsOffset(), cell.getTagsLengthUnsigned());
858           while (tagIterator.hasNext()) {
859             tags.add(tagIterator.next());
860           }
861         }
862         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
863         // incoming cell type is actually KeyValue.
864         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
865         byte[] bytes = kv.getBuffer();
866         newCells.add(
867           new KeyValue(bytes, kv.getRowOffset(), kv.getRowLength(),
868             bytes, kv.getFamilyOffset(), kv.getFamilyLength(),
869             bytes, kv.getQualifierOffset(), kv.getQualifierLength(),
870             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
871             bytes, kv.getValueOffset(), kv.getValueLength(),
872             tags));
873       }
874       // This is supposed to be safe, won't CME
875       e.setValue(newCells);
876     }
877   }
878 
879   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
880   // type is reserved and should not be explicitly set by user.
881   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
882     // No need to check if we're not going to throw
883     if (!authorizationEnabled) {
884       m.setAttribute(TAG_CHECK_PASSED, TRUE);
885       return;
886     }
887     // Superusers are allowed to store cells unconditionally.
888     if (superusers.contains(user.getShortName())) {
889       m.setAttribute(TAG_CHECK_PASSED, TRUE);
890       return;
891     }
892     // We already checked (prePut vs preBatchMutation)
893     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
894       return;
895     }
896     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
897       Cell cell = cellScanner.current();
898       if (cell.getTagsLengthUnsigned() > 0) {
899         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
900           cell.getTagsLengthUnsigned());
901         while (tagsItr.hasNext()) {
902           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
903             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
904           }
905         }
906       }
907     }
908     m.setAttribute(TAG_CHECK_PASSED, TRUE);
909   }
910 
911   /* ---- MasterObserver implementation ---- */
912 
913   public void start(CoprocessorEnvironment env) throws IOException {
914     CompoundConfiguration conf = new CompoundConfiguration();
915     conf.add(env.getConfiguration());
916 
917     authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
918     if (!authorizationEnabled) {
919       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
920     }
921 
922     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
923       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
924 
925     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
926     if (!cellFeaturesEnabled) {
927       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
928           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
929           + " accordingly.");
930     }
931 
932     ZooKeeperWatcher zk = null;
933     if (env instanceof MasterCoprocessorEnvironment) {
934       // if running on HMaster
935       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
936       zk = mEnv.getMasterServices().getZooKeeper();
937     } else if (env instanceof RegionServerCoprocessorEnvironment) {
938       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
939       zk = rsEnv.getRegionServerServices().getZooKeeper();
940     } else if (env instanceof RegionCoprocessorEnvironment) {
941       // if running at region
942       regionEnv = (RegionCoprocessorEnvironment) env;
943       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
944       zk = regionEnv.getRegionServerServices().getZooKeeper();
945       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
946         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
947     }
948 
949     // set the user-provider.
950     this.userProvider = UserProvider.instantiate(env.getConfiguration());
951 
952     // set up the list of users with superuser privilege
953     User user = userProvider.getCurrent();
954     superusers = Lists.asList(user.getShortName(),
955       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
956 
957     // If zk is null or IOException while obtaining auth manager,
958     // throw RuntimeException so that the coprocessor is unloaded.
959     if (zk != null) {
960       try {
961         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
962       } catch (IOException ioe) {
963         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
964       }
965     } else {
966       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
967     }
968 
969     if(!compatibleEarlyTermination && !cellFeaturesEnabled) {
970       LOG.warn("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
971           + " is required for " + AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT
972           + " to have an effect");
973     }
974 
975     tableAcls = new MapMaker().weakValues().makeMap();
976   }
977 
978   public void stop(CoprocessorEnvironment env) {
979 
980   }
981 
982    @Override
983   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
984       final TableName tableName) throws IOException {
985     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
986     final Configuration conf = c.getEnvironment().getConfiguration();
987     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
988       @Override
989       public Void run() throws Exception {
990         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
991         if (acls != null) {
992           tableAcls.put(tableName, acls);
993         }
994         return null;
995       }
996     });
997   }
998 
999   @Override
1000   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1001       final TableName tableName) throws IOException {
1002     final Configuration conf = ctx.getEnvironment().getConfiguration();
1003     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1004       @Override
1005       public Void run() throws Exception {
1006         List<UserPermission> perms = tableAcls.get(tableName);
1007         if (perms != null) {
1008           for (UserPermission perm : perms) {
1009             AccessControlLists.addUserPermission(conf, perm);
1010           }
1011         }
1012         tableAcls.remove(tableName);
1013         return null;
1014       }
1015     });
1016   }
1017 
1018   @Override
1019   public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1020       TableName tableName) throws IOException {}
1021 
1022   @Override
1023   public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1024       TableName tableName) throws IOException {}
1025 
1026   @Override
1027   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1028       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
1029     Set<byte[]> families = desc.getFamiliesKeys();
1030     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
1031     for (byte[] family: families) {
1032       familyMap.put(family, null);
1033     }
1034     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
1035   }
1036 
1037   @Override
1038   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
1039       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
1040     // When AC is used, it should be configured as the 1st CP.
1041     // In Master, the table operations like create, are handled by a Thread pool but the max size
1042     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1043     // sequentially only.
1044     // Related code in HMaster#startServiceThreads
1045     // {code}
1046     //   // We depend on there being only one instance of this executor running
1047     //   // at a time. To do concurrency, would need fencing of enable/disable of
1048     //   // tables.
1049     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1050     // {code}
1051     // In future if we change this pool to have more threads, then there is a chance for thread,
1052     // creating acl table, getting delayed and by that time another table creation got over and
1053     // this hook is getting called. In such a case, we will need a wait logic here which will
1054     // wait till the acl table is created.
1055     if (AccessControlLists.isAclTable(desc)) {
1056       this.aclTabAvailable = true;
1057     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1058       if (!aclTabAvailable) {
1059         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1060             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1061             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1062       } else {
1063         String owner = desc.getOwnerString();
1064         // default the table owner to current user, if not specified.
1065         if (owner == null)
1066           owner = getActiveUser().getShortName();
1067         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1068             desc.getTableName(), null, Action.values());
1069         // switch to the real hbase master user for doing the RPC on the ACL table
1070         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1071           @Override
1072           public Void run() throws Exception {
1073             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1074                 userperm);
1075             return null;
1076           }
1077         });
1078       }
1079     }
1080   }
1081 
1082   @Override
1083   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1084       throws IOException {
1085     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1086   }
1087 
1088   @Override
1089   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1090       final TableName tableName) throws IOException {
1091     final Configuration conf = c.getEnvironment().getConfiguration();
1092     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1093       @Override
1094       public Void run() throws Exception {
1095         AccessControlLists.removeTablePermissions(conf, tableName);
1096         return null;
1097       }
1098     });
1099     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1100   }
1101 
1102   @Override
1103   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1104       HTableDescriptor htd) throws IOException {
1105     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1106   }
1107 
1108   @Override
1109   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1110       TableName tableName, final HTableDescriptor htd) throws IOException {
1111     final Configuration conf = c.getEnvironment().getConfiguration();
1112     // default the table owner to current user, if not specified.
1113     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1114       getActiveUser().getShortName();
1115     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1116       @Override
1117       public Void run() throws Exception {
1118         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1119           htd.getTableName(), null, Action.values());
1120         AccessControlLists.addUserPermission(conf, userperm);
1121         return null;
1122       }
1123     });
1124   }
1125 
1126   @Override
1127   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1128       HColumnDescriptor column) throws IOException {
1129     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1130   }
1131 
1132   @Override
1133   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1134       HColumnDescriptor descriptor) throws IOException {
1135     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1136       Action.CREATE);
1137   }
1138 
1139   @Override
1140   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1141       byte[] col) throws IOException {
1142     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1143   }
1144 
1145   @Override
1146   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1147       final TableName tableName, final byte[] col) throws IOException {
1148     final Configuration conf = c.getEnvironment().getConfiguration();
1149     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1150       @Override
1151       public Void run() throws Exception {
1152         AccessControlLists.removeTablePermissions(conf, tableName, col);
1153         return null;
1154       }
1155     });
1156   }
1157 
1158   @Override
1159   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1160       throws IOException {
1161     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1162   }
1163 
1164   @Override
1165   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1166       throws IOException {
1167     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1168       // We have to unconditionally disallow disable of the ACL table when we are installed,
1169       // even if not enforcing authorizations. We are still allowing grants and revocations,
1170       // checking permissions and logging audit messages, etc. If the ACL table is not
1171       // available we will fail random actions all over the place.
1172       throw new AccessDeniedException("Not allowed to disable "
1173           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1174     }
1175     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1176   }
1177 
1178   @Override
1179   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1180       ServerName srcServer, ServerName destServer) throws IOException {
1181     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1182   }
1183 
1184   @Override
1185   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1186       throws IOException {
1187     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1188   }
1189 
1190   @Override
1191   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1192       boolean force) throws IOException {
1193     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1194   }
1195 
1196   @Override
1197   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1198       HRegionInfo regionInfo) throws IOException {
1199     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1200   }
1201 
1202   @Override
1203   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1204       throws IOException {
1205     requirePermission("balance", Action.ADMIN);
1206   }
1207 
1208   @Override
1209   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1210       boolean newValue) throws IOException {
1211     requirePermission("balanceSwitch", Action.ADMIN);
1212     return newValue;
1213   }
1214 
1215   @Override
1216   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1217       throws IOException {
1218     requirePermission("shutdown", Action.ADMIN);
1219   }
1220 
1221   @Override
1222   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1223       throws IOException {
1224     requirePermission("stopMaster", Action.ADMIN);
1225   }
1226 
1227   @Override
1228   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1229       throws IOException {
1230     if (!MetaReader.tableExists(ctx.getEnvironment().getMasterServices().getCatalogTracker(),
1231         AccessControlLists.ACL_TABLE_NAME)) {
1232       // initialize the ACL storage table
1233       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1234     } else {
1235       aclTabAvailable = true;
1236     }
1237   }
1238 
1239   @Override
1240   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1241       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1242       throws IOException {
1243     requirePermission("snapshot", Action.ADMIN);
1244   }
1245 
1246   @Override
1247   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1248       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1249       throws IOException {
1250     requirePermission("clone", Action.ADMIN);
1251   }
1252 
1253   @Override
1254   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1255       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1256       throws IOException {
1257     requirePermission("restore", Action.ADMIN);
1258   }
1259 
1260   @Override
1261   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1262       final SnapshotDescription snapshot) throws IOException {
1263     requirePermission("deleteSnapshot", Action.ADMIN);
1264   }
1265 
1266   @Override
1267   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1268       NamespaceDescriptor ns) throws IOException {
1269     requirePermission("createNamespace", Action.ADMIN);
1270   }
1271 
1272   @Override
1273   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1274       throws IOException {
1275     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1276   }
1277 
1278   @Override
1279   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1280       final String namespace) throws IOException {
1281     final Configuration conf = ctx.getEnvironment().getConfiguration();
1282     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1283       @Override
1284       public Void run() throws Exception {
1285         AccessControlLists.removeNamespacePermissions(conf, namespace);
1286         return null;
1287       }
1288     });
1289     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1290     LOG.info(namespace + " entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1291   }
1292 
1293   @Override
1294   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1295       NamespaceDescriptor ns) throws IOException {
1296     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1297   }
1298 
1299   /* ---- RegionObserver implementation ---- */
1300 
1301   @Override
1302   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1303       throws IOException {
1304     RegionCoprocessorEnvironment env = e.getEnvironment();
1305     final HRegion region = env.getRegion();
1306     if (region == null) {
1307       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1308     } else {
1309       HRegionInfo regionInfo = region.getRegionInfo();
1310       if (regionInfo.getTable().isSystemTable()) {
1311         isSystemOrSuperUser(regionEnv.getConfiguration());
1312       } else {
1313         requirePermission("preOpen", Action.ADMIN);
1314       }
1315     }
1316   }
1317 
1318   @Override
1319   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1320     RegionCoprocessorEnvironment env = c.getEnvironment();
1321     final HRegion region = env.getRegion();
1322     if (region == null) {
1323       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1324       return;
1325     }
1326     if (AccessControlLists.isAclRegion(region)) {
1327       aclRegion = true;
1328       // When this region is under recovering state, initialize will be handled by postLogReplay
1329       if (!region.isRecovering()) {
1330         try {
1331           initialize(env);
1332         } catch (IOException ex) {
1333           // if we can't obtain permissions, it's better to fail
1334           // than perform checks incorrectly
1335           throw new RuntimeException("Failed to initialize permissions cache", ex);
1336         }
1337       }
1338     } else {
1339       initialized = true;
1340     }
1341   }
1342 
1343   @Override
1344   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1345     if (aclRegion) {
1346       try {
1347         initialize(c.getEnvironment());
1348       } catch (IOException ex) {
1349         // if we can't obtain permissions, it's better to fail
1350         // than perform checks incorrectly
1351         throw new RuntimeException("Failed to initialize permissions cache", ex);
1352       }
1353     }
1354   }
1355 
1356   @Override
1357   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1358     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1359         Action.CREATE);
1360   }
1361 
1362   @Override
1363   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1364     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1365   }
1366 
1367   @Override
1368   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1369       byte[] splitRow) throws IOException {
1370     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1371   }
1372 
1373   @Override
1374   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1375       final Store store, final InternalScanner scanner, final ScanType scanType)
1376           throws IOException {
1377     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1378         Action.CREATE);
1379     return scanner;
1380   }
1381 
1382   @Override
1383   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1384       final byte [] row, final byte [] family, final Result result)
1385       throws IOException {
1386     assert family != null;
1387     RegionCoprocessorEnvironment env = c.getEnvironment();
1388     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1389     User user = getActiveUser();
1390     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1391       Action.READ);
1392     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1393       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1394         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1395       authResult.setReason("Covering cell set");
1396     }
1397     logResult(authResult);
1398     if (authorizationEnabled && !authResult.isAllowed()) {
1399       throw new AccessDeniedException("Insufficient permissions " +
1400         authResult.toContextString());
1401     }
1402   }
1403 
1404   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1405       final Query query, OpType opType) throws IOException {
1406     Filter filter = query.getFilter();
1407     // Don't wrap an AccessControlFilter
1408     if (filter != null && filter instanceof AccessControlFilter) {
1409       return;
1410     }
1411     User user = getActiveUser();
1412     RegionCoprocessorEnvironment env = c.getEnvironment();
1413     Map<byte[],? extends Collection<byte[]>> families = null;
1414     switch (opType) {
1415     case GET:
1416     case EXISTS:
1417       families = ((Get)query).getFamilyMap();
1418       break;
1419     case SCAN:
1420       families = ((Scan)query).getFamilyMap();
1421       break;
1422     default:
1423       throw new RuntimeException("Unhandled operation " + opType);
1424     }
1425     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1426     HRegion region = getRegion(env);
1427     TableName table = getTableName(region);
1428     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1429     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1430       cfVsMaxVersions.put(new SimpleByteRange(hcd.getName()), hcd.getMaxVersions());
1431     }
1432     if (!authResult.isAllowed()) {
1433       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1434         // Old behavior: Scan with only qualifier checks if we have partial
1435         // permission. Backwards compatible behavior is to throw an
1436         // AccessDeniedException immediately if there are no grants for table
1437         // or CF or CF+qual. Only proceed with an injected filter if there are
1438         // grants for qualifiers. Otherwise we will fall through below and log
1439         // the result and throw an ADE. We may end up checking qualifier
1440         // grants three times (permissionGranted above, here, and in the
1441         // filter) but that's the price of backwards compatibility.
1442         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1443           authResult.setAllowed(true);
1444           authResult.setReason("Access allowed with filter");
1445           // Only wrap the filter if we are enforcing authorizations
1446           if (authorizationEnabled) {
1447             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1448               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1449               cfVsMaxVersions);
1450             // wrap any existing filter
1451             if (filter != null) {
1452               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1453                 Lists.newArrayList(ourFilter, filter));
1454             }
1455             switch (opType) {
1456               case GET:
1457               case EXISTS:
1458                 ((Get)query).setFilter(ourFilter);
1459                 break;
1460               case SCAN:
1461                 ((Scan)query).setFilter(ourFilter);
1462                 break;
1463               default:
1464                 throw new RuntimeException("Unhandled operation " + opType);
1465             }
1466           }
1467         }
1468       } else {
1469         // New behavior: Any access we might be granted is more fine-grained
1470         // than whole table or CF. Simply inject a filter and return what is
1471         // allowed. We will not throw an AccessDeniedException. This is a
1472         // behavioral change since 0.96.
1473         authResult.setAllowed(true);
1474         authResult.setReason("Access allowed with filter");
1475         // Only wrap the filter if we are enforcing authorizations
1476         if (authorizationEnabled) {
1477           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1478             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1479           // wrap any existing filter
1480           if (filter != null) {
1481             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1482               Lists.newArrayList(ourFilter, filter));
1483           }
1484           switch (opType) {
1485             case GET:
1486             case EXISTS:
1487               ((Get)query).setFilter(ourFilter);
1488               break;
1489             case SCAN:
1490               ((Scan)query).setFilter(ourFilter);
1491               break;
1492             default:
1493               throw new RuntimeException("Unhandled operation " + opType);
1494           }
1495         }
1496       }
1497     }
1498 
1499     logResult(authResult);
1500     if (authorizationEnabled && !authResult.isAllowed()) {
1501       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1502         ", action=READ)");
1503     }
1504   }
1505 
1506   @Override
1507   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1508       final Get get, final List<Cell> result) throws IOException {
1509     internalPreRead(c, get, OpType.GET);
1510   }
1511 
1512   @Override
1513   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1514       final Get get, final boolean exists) throws IOException {
1515     internalPreRead(c, get, OpType.EXISTS);
1516     return exists;
1517   }
1518 
1519   @Override
1520   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1521       final Put put, final WALEdit edit, final Durability durability)
1522       throws IOException {
1523     User user = getActiveUser();
1524     checkForReservedTagPresence(user, put);
1525 
1526     // Require WRITE permission to the table, CF, or top visible value, if any.
1527     // NOTE: We don't need to check the permissions for any earlier Puts
1528     // because we treat the ACLs in each Put as timestamped like any other
1529     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1530     // change the ACL of any previous Put. This allows simple evolution of
1531     // security policy over time without requiring expensive updates.
1532     RegionCoprocessorEnvironment env = c.getEnvironment();
1533     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1534     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1535     logResult(authResult);
1536     if (!authResult.isAllowed()) {
1537       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1538         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1539       } else if (authorizationEnabled) {
1540         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1541       }
1542     }
1543 
1544     // Add cell ACLs from the operation to the cells themselves
1545     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1546     if (bytes != null) {
1547       if (cellFeaturesEnabled) {
1548         addCellPermissions(bytes, put.getFamilyCellMap());
1549       } else {
1550         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1551       }
1552     }
1553   }
1554 
1555   @Override
1556   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1557       final Put put, final WALEdit edit, final Durability durability) {
1558     if (aclRegion) {
1559       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1560     }
1561   }
1562 
1563   @Override
1564   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1565       final Delete delete, final WALEdit edit, final Durability durability)
1566       throws IOException {
1567     // An ACL on a delete is useless, we shouldn't allow it
1568     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1569       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1570     }
1571     // Require WRITE permissions on all cells covered by the delete. Unlike
1572     // for Puts we need to check all visible prior versions, because a major
1573     // compaction could remove them. If the user doesn't have permission to
1574     // overwrite any of the visible versions ('visible' defined as not covered
1575     // by a tombstone already) then we have to disallow this operation.
1576     RegionCoprocessorEnvironment env = c.getEnvironment();
1577     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1578     User user = getActiveUser();
1579     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1580     logResult(authResult);
1581     if (!authResult.isAllowed()) {
1582       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1583         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1584       } else if (authorizationEnabled) {
1585         throw new AccessDeniedException("Insufficient permissions " +
1586           authResult.toContextString());
1587       }
1588     }
1589   }
1590 
1591   @Override
1592   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1593       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1594     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1595       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1596       for (int i = 0; i < miniBatchOp.size(); i++) {
1597         Mutation m = miniBatchOp.getOperation(i);
1598         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1599           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1600           // perm check
1601           OpType opType;
1602           if (m instanceof Put) {
1603             checkForReservedTagPresence(getActiveUser(), m);
1604             opType = OpType.PUT;
1605           } else {
1606             opType = OpType.DELETE;
1607           }
1608           AuthResult authResult = null;
1609           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1610             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1611             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1612               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1613           } else {
1614             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1615               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1616           }
1617           logResult(authResult);
1618           if (authorizationEnabled && !authResult.isAllowed()) {
1619             throw new AccessDeniedException("Insufficient permissions "
1620               + authResult.toContextString());
1621           }
1622         }
1623       }
1624     }
1625   }
1626 
1627   @Override
1628   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1629       final Delete delete, final WALEdit edit, final Durability durability)
1630       throws IOException {
1631     if (aclRegion) {
1632       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1633     }
1634   }
1635 
1636   @Override
1637   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1638       final byte [] row, final byte [] family, final byte [] qualifier,
1639       final CompareFilter.CompareOp compareOp,
1640       final ByteArrayComparable comparator, final Put put,
1641       final boolean result) throws IOException {
1642     User user = getActiveUser();
1643     checkForReservedTagPresence(user, put);
1644 
1645     // Require READ and WRITE permissions on the table, CF, and KV to update
1646     RegionCoprocessorEnvironment env = c.getEnvironment();
1647     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1648     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1649       Action.READ, Action.WRITE);
1650     logResult(authResult);
1651     if (!authResult.isAllowed()) {
1652       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1653         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1654       } else if (authorizationEnabled) {
1655         throw new AccessDeniedException("Insufficient permissions " +
1656           authResult.toContextString());
1657       }
1658     }
1659 
1660     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1661     if (bytes != null) {
1662       if (cellFeaturesEnabled) {
1663         addCellPermissions(bytes, put.getFamilyCellMap());
1664       } else {
1665         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1666       }
1667     }
1668     return result;
1669   }
1670 
1671   @Override
1672   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1673       final byte[] row, final byte[] family, final byte[] qualifier,
1674       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1675       final boolean result) throws IOException {
1676     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1677       // We had failure with table, cf and q perm checks and now giving a chance for cell
1678       // perm check
1679       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1680       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1681       AuthResult authResult = null;
1682       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1683           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1684         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1685             getActiveUser(), Action.READ, table, families);
1686       } else {
1687         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1688             getActiveUser(), Action.READ, table, families);
1689       }
1690       logResult(authResult);
1691       if (authorizationEnabled && !authResult.isAllowed()) {
1692         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1693       }
1694     }
1695     return result;
1696   }
1697 
1698   @Override
1699   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1700       final byte [] row, final byte [] family, final byte [] qualifier,
1701       final CompareFilter.CompareOp compareOp,
1702       final ByteArrayComparable comparator, final Delete delete,
1703       final boolean result) throws IOException {
1704     // An ACL on a delete is useless, we shouldn't allow it
1705     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1706       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1707           delete.toString());
1708     }
1709     // Require READ and WRITE permissions on the table, CF, and the KV covered
1710     // by the delete
1711     RegionCoprocessorEnvironment env = c.getEnvironment();
1712     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1713     User user = getActiveUser();
1714     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1715       Action.READ, Action.WRITE);
1716     logResult(authResult);
1717     if (!authResult.isAllowed()) {
1718       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1719         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1720       } else if (authorizationEnabled) {
1721         throw new AccessDeniedException("Insufficient permissions " +
1722           authResult.toContextString());
1723       }
1724     }
1725     return result;
1726   }
1727 
1728   @Override
1729   public boolean preCheckAndDeleteAfterRowLock(
1730       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1731       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1732       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1733       throws IOException {
1734     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1735       // We had failure with table, cf and q perm checks and now giving a chance for cell
1736       // perm check
1737       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1738       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1739       AuthResult authResult = null;
1740       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1741           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1742         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1743             getActiveUser(), Action.READ, table, families);
1744       } else {
1745         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1746             getActiveUser(), Action.READ, table, families);
1747       }
1748       logResult(authResult);
1749       if (authorizationEnabled && !authResult.isAllowed()) {
1750         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1751       }
1752     }
1753     return result;
1754   }
1755 
1756   @Override
1757   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1758       final byte [] row, final byte [] family, final byte [] qualifier,
1759       final long amount, final boolean writeToWAL)
1760       throws IOException {
1761     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1762     // incremented value
1763     RegionCoprocessorEnvironment env = c.getEnvironment();
1764     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1765     User user = getActiveUser();
1766     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1767       Action.WRITE);
1768     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1769       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1770         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1771       authResult.setReason("Covering cell set");
1772     }
1773     logResult(authResult);
1774     if (authorizationEnabled && !authResult.isAllowed()) {
1775       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1776     }
1777     return -1;
1778   }
1779 
1780   @Override
1781   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1782       throws IOException {
1783     User user = getActiveUser();
1784     checkForReservedTagPresence(user, append);
1785 
1786     // Require WRITE permission to the table, CF, and the KV to be appended
1787     RegionCoprocessorEnvironment env = c.getEnvironment();
1788     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1789     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1790     logResult(authResult);
1791     if (!authResult.isAllowed()) {
1792       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1793         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1794       } else if (authorizationEnabled)  {
1795         throw new AccessDeniedException("Insufficient permissions " +
1796           authResult.toContextString());
1797       }
1798     }
1799 
1800     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1801     if (bytes != null) {
1802       if (cellFeaturesEnabled) {
1803         addCellPermissions(bytes, append.getFamilyCellMap());
1804       } else {
1805         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1806       }
1807     }
1808 
1809     return null;
1810   }
1811 
1812   @Override
1813   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1814       final Append append) throws IOException {
1815     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1816       // We had failure with table, cf and q perm checks and now giving a chance for cell
1817       // perm check
1818       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1819       AuthResult authResult = null;
1820       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1821           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1822         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1823             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1824       } else {
1825         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1826             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1827       }
1828       logResult(authResult);
1829       if (authorizationEnabled && !authResult.isAllowed()) {
1830         throw new AccessDeniedException("Insufficient permissions " +
1831           authResult.toContextString());
1832       }
1833     }
1834     return null;
1835   }
1836 
1837   @Override
1838   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1839       final Increment increment)
1840       throws IOException {
1841     User user = getActiveUser();
1842     checkForReservedTagPresence(user, increment);
1843 
1844     // Require WRITE permission to the table, CF, and the KV to be replaced by
1845     // the incremented value
1846     RegionCoprocessorEnvironment env = c.getEnvironment();
1847     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1848     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1849       Action.WRITE);
1850     logResult(authResult);
1851     if (!authResult.isAllowed()) {
1852       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1853         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1854       } else if (authorizationEnabled) {
1855         throw new AccessDeniedException("Insufficient permissions " +
1856           authResult.toContextString());
1857       }
1858     }
1859 
1860     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1861     if (bytes != null) {
1862       if (cellFeaturesEnabled) {
1863         addCellPermissions(bytes, increment.getFamilyCellMap());
1864       } else {
1865         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1866       }
1867     }
1868 
1869     return null;
1870   }
1871 
1872   @Override
1873   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1874       final Increment increment) throws IOException {
1875     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1876       // We had failure with table, cf and q perm checks and now giving a chance for cell
1877       // perm check
1878       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1879       AuthResult authResult = null;
1880       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1881           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1882         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1883             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1884       } else {
1885         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1886             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1887       }
1888       logResult(authResult);
1889       if (authorizationEnabled && !authResult.isAllowed()) {
1890         throw new AccessDeniedException("Insufficient permissions " +
1891           authResult.toContextString());
1892       }
1893     }
1894     return null;
1895   }
1896 
1897   @Override
1898   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1899       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1900     // If the HFile version is insufficient to persist tags, we won't have any
1901     // work to do here
1902     if (!cellFeaturesEnabled) {
1903       return newCell;
1904     }
1905 
1906     // Collect any ACLs from the old cell
1907     List<Tag> tags = Lists.newArrayList();
1908     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1909     if (oldCell != null) {
1910       // Save an object allocation where we can
1911       if (oldCell.getTagsLengthUnsigned() > 0) {
1912         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1913           oldCell.getTagsOffset(), oldCell.getTagsLengthUnsigned());
1914         while (tagIterator.hasNext()) {
1915           Tag tag = tagIterator.next();
1916           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1917             // Not an ACL tag, just carry it through
1918             if (LOG.isTraceEnabled()) {
1919               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1920                 " length " + tag.getTagLength());
1921             }
1922             tags.add(tag);
1923           } else {
1924             // Merge the perms from the older ACL into the current permission set
1925             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1926               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1927                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1928             perms.putAll(kvPerms);
1929           }
1930         }
1931       }
1932     }
1933 
1934     // Do we have an ACL on the operation?
1935     byte[] aclBytes = mutation.getACL();
1936     if (aclBytes != null) {
1937       // Yes, use it
1938       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1939     } else {
1940       // No, use what we carried forward
1941       if (perms != null) {
1942         // TODO: If we collected ACLs from more than one tag we may have a
1943         // List<Permission> of size > 1, this can be collapsed into a single
1944         // Permission
1945         if (LOG.isTraceEnabled()) {
1946           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1947         }
1948         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1949           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1950       }
1951     }
1952 
1953     // If we have no tags to add, just return
1954     if (tags.isEmpty()) {
1955       return newCell;
1956     }
1957 
1958     // We need to create another KV, unfortunately, because the current new KV
1959     // has no space for tags
1960     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1961     byte[] bytes = newKv.getBuffer();
1962     KeyValue rewriteKv = new KeyValue(bytes, newKv.getRowOffset(), newKv.getRowLength(),
1963       bytes, newKv.getFamilyOffset(), newKv.getFamilyLength(),
1964       bytes, newKv.getQualifierOffset(), newKv.getQualifierLength(),
1965       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1966       bytes, newKv.getValueOffset(), newKv.getValueLength(),
1967       tags);
1968     // Preserve mvcc data
1969     rewriteKv.setMvccVersion(newKv.getMvccVersion());
1970     return rewriteKv;
1971   }
1972 
1973   @Override
1974   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1975       final Scan scan, final RegionScanner s) throws IOException {
1976     internalPreRead(c, scan, OpType.SCAN);
1977     return s;
1978   }
1979 
1980   @Override
1981   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1982       final Scan scan, final RegionScanner s) throws IOException {
1983     User user = getActiveUser();
1984     if (user != null && user.getShortName() != null) {
1985       // store reference to scanner owner for later checks
1986       scannerOwners.put(s, user.getShortName());
1987     }
1988     return s;
1989   }
1990 
1991   @Override
1992   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1993       final InternalScanner s, final List<Result> result,
1994       final int limit, final boolean hasNext) throws IOException {
1995     requireScannerOwner(s);
1996     return hasNext;
1997   }
1998 
1999   @Override
2000   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2001       final InternalScanner s) throws IOException {
2002     requireScannerOwner(s);
2003   }
2004 
2005   @Override
2006   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2007       final InternalScanner s) throws IOException {
2008     // clean up any associated owner mapping
2009     scannerOwners.remove(s);
2010   }
2011 
2012   /**
2013    * Verify, when servicing an RPC, that the caller is the scanner owner.
2014    * If so, we assume that access control is correctly enforced based on
2015    * the checks performed in preScannerOpen()
2016    */
2017   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2018     String requestUserName = RpcServer.getRequestUserName();
2019     String owner = scannerOwners.get(s);
2020     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2021       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2022     }
2023   }
2024 
2025   /**
2026    * Verifies user has CREATE privileges on
2027    * the Column Families involved in the bulkLoadHFile
2028    * request. Specific Column Write privileges are presently
2029    * ignored.
2030    */
2031   @Override
2032   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2033       List<Pair<byte[], String>> familyPaths) throws IOException {
2034     for(Pair<byte[],String> el : familyPaths) {
2035       requirePermission("preBulkLoadHFile",
2036           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2037           el.getFirst(),
2038           null,
2039           Action.CREATE);
2040     }
2041   }
2042 
2043   /**
2044    * Authorization check for
2045    * SecureBulkLoadProtocol.prepareBulkLoad()
2046    * @param ctx the context
2047    * @param request the request
2048    * @throws IOException
2049    */
2050   @Override
2051   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2052                                  PrepareBulkLoadRequest request) throws IOException {
2053     requireAccess("prePareBulkLoad",
2054         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2055   }
2056 
2057   /**
2058    * Authorization security check for
2059    * SecureBulkLoadProtocol.cleanupBulkLoad()
2060    * @param ctx the context
2061    * @param request the request
2062    * @throws IOException
2063    */
2064   @Override
2065   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2066                                  CleanupBulkLoadRequest request) throws IOException {
2067     requireAccess("preCleanupBulkLoad",
2068         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2069   }
2070 
2071   /* ---- EndpointObserver implementation ---- */
2072 
2073   @Override
2074   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2075       Service service, String methodName, Message request) throws IOException {
2076     // Don't intercept calls to our own AccessControlService, we check for
2077     // appropriate permissions in the service handlers
2078     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2079       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2080         methodName + ")",
2081         getTableName(ctx.getEnvironment()), null, null,
2082         Action.EXEC);
2083     }
2084     return request;
2085   }
2086 
2087   @Override
2088   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2089       Service service, String methodName, Message request, Message.Builder responseBuilder)
2090       throws IOException { }
2091 
2092   /* ---- Protobuf AccessControlService implementation ---- */
2093 
2094   @Override
2095   public void grant(RpcController controller,
2096                     AccessControlProtos.GrantRequest request,
2097                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2098     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2099     AccessControlProtos.GrantResponse response = null;
2100     try {
2101       // verify it's only running at .acl.
2102       if (aclRegion) {
2103         if (!initialized) {
2104           throw new CoprocessorException("AccessController not yet initialized");
2105         }
2106         if (LOG.isDebugEnabled()) {
2107           LOG.debug("Received request to grant access permission " + perm.toString());
2108         }
2109 
2110         switch(request.getUserPermission().getPermission().getType()) {
2111           case Global :
2112           case Table :
2113             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2114               perm.getQualifier(), Action.ADMIN);
2115             break;
2116           case Namespace :
2117             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2118            break;
2119         }
2120 
2121         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2122           @Override
2123           public Void run() throws Exception {
2124             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2125             return null;
2126           }
2127         });
2128 
2129         if (AUDITLOG.isTraceEnabled()) {
2130           // audit log should store permission changes in addition to auth results
2131           AUDITLOG.trace("Granted permission " + perm.toString());
2132         }
2133       } else {
2134         throw new CoprocessorException(AccessController.class, "This method "
2135             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2136       }
2137       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2138     } catch (IOException ioe) {
2139       // pass exception back up
2140       ResponseConverter.setControllerException(controller, ioe);
2141     }
2142     done.run(response);
2143   }
2144 
2145   @Override
2146   public void revoke(RpcController controller,
2147                      AccessControlProtos.RevokeRequest request,
2148                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2149     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2150     AccessControlProtos.RevokeResponse response = null;
2151     try {
2152       // only allowed to be called on _acl_ region
2153       if (aclRegion) {
2154         if (!initialized) {
2155           throw new CoprocessorException("AccessController not yet initialized");
2156         }
2157         if (LOG.isDebugEnabled()) {
2158           LOG.debug("Received request to revoke access permission " + perm.toString());
2159         }
2160 
2161         switch(request.getUserPermission().getPermission().getType()) {
2162           case Global :
2163           case Table :
2164             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2165               perm.getQualifier(), Action.ADMIN);
2166             break;
2167           case Namespace :
2168             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2169             break;
2170         }
2171 
2172         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2173           @Override
2174           public Void run() throws Exception {
2175             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2176             return null;
2177           }
2178         });
2179 
2180         if (AUDITLOG.isTraceEnabled()) {
2181           // audit log should record all permission changes
2182           AUDITLOG.trace("Revoked permission " + perm.toString());
2183         }
2184       } else {
2185         throw new CoprocessorException(AccessController.class, "This method "
2186             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2187       }
2188       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2189     } catch (IOException ioe) {
2190       // pass exception back up
2191       ResponseConverter.setControllerException(controller, ioe);
2192     }
2193     done.run(response);
2194   }
2195 
2196   @Override
2197   public void getUserPermissions(RpcController controller,
2198                                  AccessControlProtos.GetUserPermissionsRequest request,
2199                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2200     AccessControlProtos.GetUserPermissionsResponse response = null;
2201     try {
2202       // only allowed to be called on _acl_ region
2203       if (aclRegion) {
2204         if (!initialized) {
2205           throw new CoprocessorException("AccessController not yet initialized");
2206         }
2207         List<UserPermission> perms = null;
2208         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2209           final TableName table = request.hasTableName() ?
2210             ProtobufUtil.toTableName(request.getTableName()) : null;
2211           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2212           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2213             @Override
2214             public List<UserPermission> run() throws Exception {
2215               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2216             }
2217           });
2218         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2219           final String namespace = request.getNamespaceName().toStringUtf8();
2220           requireGlobalPermission("userPermissions", Action.ADMIN, namespace);
2221           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2222             @Override
2223             public List<UserPermission> run() throws Exception {
2224               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2225                 namespace);
2226             }
2227           });
2228         } else {
2229           requirePermission("userPermissions", Action.ADMIN);
2230           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2231             @Override
2232             public List<UserPermission> run() throws Exception {
2233               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2234             }
2235           });
2236         }
2237         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2238       } else {
2239         throw new CoprocessorException(AccessController.class, "This method "
2240             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2241       }
2242     } catch (IOException ioe) {
2243       // pass exception back up
2244       ResponseConverter.setControllerException(controller, ioe);
2245     }
2246     done.run(response);
2247   }
2248 
2249   @Override
2250   public void checkPermissions(RpcController controller,
2251                                AccessControlProtos.CheckPermissionsRequest request,
2252                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2253     Permission[] permissions = new Permission[request.getPermissionCount()];
2254     for (int i=0; i < request.getPermissionCount(); i++) {
2255       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2256     }
2257     AccessControlProtos.CheckPermissionsResponse response = null;
2258     try {
2259       User user = getActiveUser();
2260       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2261       for (Permission permission : permissions) {
2262         if (permission instanceof TablePermission) {
2263           // Check table permissions
2264 
2265           TablePermission tperm = (TablePermission) permission;
2266           for (Action action : permission.getActions()) {
2267             if (!tperm.getTableName().equals(tableName)) {
2268               throw new CoprocessorException(AccessController.class, String.format("This method "
2269                   + "can only execute at the table specified in TablePermission. " +
2270                   "Table of the region:%s , requested table:%s", tableName,
2271                   tperm.getTableName()));
2272             }
2273 
2274             Map<byte[], Set<byte[]>> familyMap =
2275                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2276             if (tperm.getFamily() != null) {
2277               if (tperm.getQualifier() != null) {
2278                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2279                 qualifiers.add(tperm.getQualifier());
2280                 familyMap.put(tperm.getFamily(), qualifiers);
2281               } else {
2282                 familyMap.put(tperm.getFamily(), null);
2283               }
2284             }
2285 
2286             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2287               familyMap);
2288             logResult(result);
2289             if (!result.isAllowed()) {
2290               // Even if passive we need to throw an exception here, we support checking
2291               // effective permissions, so throw unconditionally
2292               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2293                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2294                 ", action=" + action.toString() + ")");
2295             }
2296           }
2297 
2298         } else {
2299           // Check global permissions
2300 
2301           for (Action action : permission.getActions()) {
2302             AuthResult result;
2303             if (authManager.authorize(user, action)) {
2304               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2305                 action, null, null);
2306             } else {
2307               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2308                 null, null);
2309             }
2310             logResult(result);
2311             if (!result.isAllowed()) {
2312               // Even if passive we need to throw an exception here, we support checking
2313               // effective permissions, so throw unconditionally
2314               throw new AccessDeniedException("Insufficient permissions (action=" +
2315                 action.toString() + ")");
2316             }
2317           }
2318         }
2319       }
2320       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2321     } catch (IOException ioe) {
2322       ResponseConverter.setControllerException(controller, ioe);
2323     }
2324     done.run(response);
2325   }
2326 
2327   @Override
2328   public Service getService() {
2329     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2330   }
2331 
2332   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2333     return e.getRegion();
2334   }
2335 
2336   private TableName getTableName(RegionCoprocessorEnvironment e) {
2337     HRegion region = e.getRegion();
2338     if (region != null) {
2339       return getTableName(region);
2340     }
2341     return null;
2342   }
2343 
2344   private TableName getTableName(HRegion region) {
2345     HRegionInfo regionInfo = region.getRegionInfo();
2346     if (regionInfo != null) {
2347       return regionInfo.getTable();
2348     }
2349     return null;
2350   }
2351 
2352   @Override
2353   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2354       throws IOException {
2355     requirePermission("preClose", Action.ADMIN);
2356   }
2357 
2358   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2359     // No need to check if we're not going to throw
2360     if (!authorizationEnabled) {
2361       return;
2362     }
2363     User user = userProvider.getCurrent();
2364     if (user == null) {
2365       throw new IOException("Unable to obtain the current user, " +
2366         "authorization checks for internal operations will not work correctly!");
2367     }
2368     User activeUser = getActiveUser();
2369     if (!(superusers.contains(activeUser.getShortName()))) {
2370       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2371         "is not system or super user.");
2372     }
2373   }
2374 
2375   @Override
2376   public void preStopRegionServer(
2377       ObserverContext<RegionServerCoprocessorEnvironment> env)
2378       throws IOException {
2379     requirePermission("preStopRegionServer", Action.ADMIN);
2380   }
2381 
2382   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2383       byte[] qualifier) {
2384     if (family == null) {
2385       return null;
2386     }
2387 
2388     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2389     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2390     return familyMap;
2391   }
2392 
2393   @Override
2394   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2395       List<TableName> tableNamesList,
2396       List<HTableDescriptor> descriptors) throws IOException {
2397     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2398     // ADMIN privs.
2399     if (tableNamesList == null || tableNamesList.isEmpty()) {
2400       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2401     }
2402     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2403     // request can be granted.
2404     else {
2405       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2406       for (TableName tableName: tableNamesList) {
2407         // Skip checks for a table that does not exist
2408         if (masterServices.getTableDescriptors().get(tableName) == null) {
2409           continue;
2410         }
2411         requirePermission("getTableDescriptors", tableName, null, null,
2412           Action.ADMIN, Action.CREATE);
2413       }
2414     }
2415   }
2416 
2417   @Override
2418   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2419       HRegion regionB) throws IOException {
2420     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2421       Action.ADMIN);
2422   }
2423 
2424   @Override
2425   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2426       HRegion regionB, HRegion mergedRegion) throws IOException { }
2427 
2428   @Override
2429   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2430       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2431 
2432   @Override
2433   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2434       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2435 
2436   @Override
2437   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2438       HRegion regionA, HRegion regionB) throws IOException { }
2439 
2440   @Override
2441   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2442       HRegion regionA, HRegion regionB) throws IOException { }
2443 
2444   @Override
2445   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2446       throws IOException {
2447     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2448   }
2449 
2450   @Override
2451   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2452       throws IOException { }
2453 
2454   @Override
2455   public ReplicationEndpoint postCreateReplicationEndPoint(
2456       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2457     return endpoint;
2458   }
2459 
2460   @Override
2461   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2462       List<WALEntry> entries, CellScanner cells) throws IOException {
2463     requirePermission("replicateLogEntries", Action.WRITE);
2464   }
2465 
2466   @Override
2467   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2468       List<WALEntry> entries, CellScanner cells) throws IOException {
2469   }
2470 }