View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.visibility;
20  
21  import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SANITY_CHECK_FAILURE;
22  import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS;
23  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
24  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
25  
26  import java.io.IOException;
27  import java.net.InetAddress;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellScanner;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.CoprocessorEnvironment;
41  import org.apache.hadoop.hbase.DoNotRetryIOException;
42  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.TagType;
52  import org.apache.hadoop.hbase.catalog.MetaReader;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.client.Scan;
62  import org.apache.hadoop.hbase.constraint.ConstraintException;
63  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
70  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.exceptions.DeserializationException;
73  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
74  import org.apache.hadoop.hbase.filter.Filter;
75  import org.apache.hadoop.hbase.filter.FilterBase;
76  import org.apache.hadoop.hbase.filter.FilterList;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RpcServer;
79  import org.apache.hadoop.hbase.master.MasterServices;
80  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
82  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
83  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsRequest;
84  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse;
85  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.ListLabelsRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.ListLabelsResponse;
87  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.SetAuthsRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
89  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
92  import org.apache.hadoop.hbase.regionserver.BloomType;
93  import org.apache.hadoop.hbase.regionserver.DeleteTracker;
94  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
95  import org.apache.hadoop.hbase.regionserver.HRegion;
96  import org.apache.hadoop.hbase.regionserver.InternalScanner;
97  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
98  import org.apache.hadoop.hbase.regionserver.OperationStatus;
99  import org.apache.hadoop.hbase.regionserver.RegionScanner;
100 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
101 import org.apache.hadoop.hbase.security.AccessDeniedException;
102 import org.apache.hadoop.hbase.security.User;
103 import org.apache.hadoop.hbase.security.access.AccessControlLists;
104 import org.apache.hadoop.hbase.security.access.AccessController;
105 import org.apache.hadoop.hbase.util.ByteStringer;
106 import org.apache.hadoop.hbase.util.Bytes;
107 import org.apache.hadoop.hbase.util.Pair;
108 
109 import com.google.common.collect.Lists;
110 import com.google.common.collect.MapMaker;
111 import com.google.protobuf.ByteString;
112 import com.google.protobuf.RpcCallback;
113 import com.google.protobuf.RpcController;
114 import com.google.protobuf.Service;
115 
116 /**
117  * Coprocessor that has both the MasterObserver and RegionObserver implemented that supports in
118  * visibility labels
119  */
120 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
121 public class VisibilityController extends BaseMasterAndRegionObserver implements
122     VisibilityLabelsService.Interface, CoprocessorService {
123 
124   private static final Log LOG = LogFactory.getLog(VisibilityController.class);
125   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
126       + VisibilityController.class.getName());
127   // flags if we are running on a region of the 'labels' table
128   private boolean labelsRegion = false;
129   // Flag denoting whether AcessController is available or not.
130   private boolean accessControllerAvailable = false;
131   private Configuration conf;
132   private volatile boolean initialized = false;
133   private boolean checkAuths = false;
134   /** Mapping of scanner instances to the user who created them */
135   private Map<InternalScanner,String> scannerOwners =
136       new MapMaker().weakKeys().makeMap();
137 
138   private List<String> superUsers;
139   private List<String> superGroups;
140   private VisibilityLabelService visibilityLabelService;
141 
142   /** if we are active, usually true, only not true if "hbase.security.authorization"
143     has been set to false in site configuration */
144   boolean authorizationEnabled;
145 
146   // Add to this list if there are any reserved tag types
147   private static ArrayList<Byte> RESERVED_VIS_TAG_TYPES = new ArrayList<Byte>();
148   static {
149     RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_TAG_TYPE);
150     RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE);
151     RESERVED_VIS_TAG_TYPES.add(TagType.STRING_VIS_TAG_TYPE);
152   }
153 
154   @Override
155   public void start(CoprocessorEnvironment env) throws IOException {
156     this.conf = env.getConfiguration();
157 
158     authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
159     if (!authorizationEnabled) {
160       LOG.warn("The VisibilityController has been loaded with authorization checks disabled.");
161     }
162 
163     if (HFile.getFormatVersion(conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
164       throw new RuntimeException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
165         + " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY
166         + " accordingly.");
167     }
168 
169     if (env instanceof RegionServerCoprocessorEnvironment) {
170       throw new RuntimeException("Visibility controller should not be configured as "
171           + "'hbase.coprocessor.regionserver.classes'.");
172     }
173     // Do not create for master CPs
174     if (!(env instanceof MasterCoprocessorEnvironment)) {
175       visibilityLabelService = VisibilityLabelServiceManager.getInstance()
176           .getVisibilityLabelService(this.conf);
177     }
178     Pair<List<String>, List<String>> superUsersAndGroups =
179         VisibilityUtils.getSystemAndSuperUsers(this.conf);
180     this.superUsers = superUsersAndGroups.getFirst();
181     this.superGroups = superUsersAndGroups.getSecond();
182   }
183 
184   @Override
185   public void stop(CoprocessorEnvironment env) throws IOException {
186 
187   }
188 
189   /********************************* Master related hooks **********************************/
190 
191   @Override
192   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
193     // Need to create the new system table for labels here
194     MasterServices master = ctx.getEnvironment().getMasterServices();
195     if (!MetaReader.tableExists(master.getCatalogTracker(), LABELS_TABLE_NAME)) {
196       HTableDescriptor labelsTable = new HTableDescriptor(LABELS_TABLE_NAME);
197       HColumnDescriptor labelsColumn = new HColumnDescriptor(LABELS_TABLE_FAMILY);
198       labelsColumn.setBloomFilterType(BloomType.NONE);
199       labelsColumn.setBlockCacheEnabled(false); // We will cache all the labels. No need of normal
200                                                  // table block cache.
201       labelsTable.addFamily(labelsColumn);
202       // Let the "labels" table having only one region always. We are not expecting too many labels in
203       // the system.
204       labelsTable.setValue(HTableDescriptor.SPLIT_POLICY,
205           DisabledRegionSplitPolicy.class.getName());
206       labelsTable.setValue(Bytes.toBytes(HConstants.DISALLOW_WRITES_IN_RECOVERING),
207           Bytes.toBytes(true));
208       master.createTable(labelsTable, null);
209     }
210   }
211 
212   @Override
213   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
214       TableName tableName, HTableDescriptor htd) throws IOException {
215     if (!authorizationEnabled) {
216       return;
217     }
218     if (LABELS_TABLE_NAME.equals(tableName)) {
219       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
220     }
221   }
222 
223   @Override
224   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName,
225       HColumnDescriptor column) throws IOException {
226     if (!authorizationEnabled) {
227       return;
228     }
229     if (LABELS_TABLE_NAME.equals(tableName)) {
230       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
231     }
232   }
233 
234   @Override
235   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
236       TableName tableName, HColumnDescriptor descriptor) throws IOException {
237     if (!authorizationEnabled) {
238       return;
239     }
240     if (LABELS_TABLE_NAME.equals(tableName)) {
241       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
242     }
243   }
244 
245   @Override
246   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
247       TableName tableName, byte[] c) throws IOException {
248     if (!authorizationEnabled) {
249       return;
250     }
251     if (LABELS_TABLE_NAME.equals(tableName)) {
252       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
253     }
254   }
255 
256   @Override
257   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName)
258       throws IOException {
259     if (!authorizationEnabled) {
260       return;
261     }
262     if (LABELS_TABLE_NAME.equals(tableName)) {
263       throw new ConstraintException("Cannot disable " + LABELS_TABLE_NAME);
264     }
265   }
266 
267   /****************************** Region related hooks ******************************/
268 
269   @Override
270   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
271     // Read the entire labels table and populate the zk
272     if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
273       this.labelsRegion = true;
274       this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
275           .contains(AccessController.class.getName());
276       // Defer the init of VisibilityLabelService on labels region until it is in recovering state.
277       if (!e.getEnvironment().getRegion().isRecovering()) {
278         initVisibilityLabelService(e.getEnvironment());
279       }
280     } else {
281       checkAuths = e.getEnvironment().getConfiguration()
282           .getBoolean(VisibilityConstants.CHECK_AUTHS_FOR_MUTATION, false);
283       initVisibilityLabelService(e.getEnvironment());
284     }
285   }
286 
287   @Override
288   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> e) {
289     if (this.labelsRegion) {
290       initVisibilityLabelService(e.getEnvironment());
291       LOG.debug("post labels region log replay");
292     }
293   }
294 
295   private void initVisibilityLabelService(RegionCoprocessorEnvironment env) {
296     try {
297       this.visibilityLabelService.init(env);
298       this.initialized = true;
299     } catch (IOException ioe) {
300       LOG.error("Error while initializing VisibilityLabelService..", ioe);
301       throw new RuntimeException(ioe);
302     }
303   }
304 
305   @Override
306   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
307       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
308     if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
309       return;
310     }
311     // TODO this can be made as a global LRU cache at HRS level?
312     Map<String, List<Tag>> labelCache = new HashMap<String, List<Tag>>();
313     for (int i = 0; i < miniBatchOp.size(); i++) {
314       Mutation m = miniBatchOp.getOperation(i);
315       CellVisibility cellVisibility = null;
316       try {
317         cellVisibility = m.getCellVisibility();
318       } catch (DeserializationException de) {
319         miniBatchOp.setOperationStatus(i,
320             new OperationStatus(SANITY_CHECK_FAILURE, de.getMessage()));
321         continue;
322       }
323       boolean sanityFailure = false;
324       boolean modifiedTagFound = false;
325       Pair<Boolean, Tag> pair = new Pair<Boolean, Tag>(false, null);
326       for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
327         pair = checkForReservedVisibilityTagPresence(cellScanner.current(), pair);
328         if (!pair.getFirst()) {
329           // Don't disallow reserved tags if authorization is disabled
330           if (authorizationEnabled) {
331             miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE,
332               "Mutation contains cell with reserved type tag"));
333             sanityFailure = true;
334           }
335           break;
336         } else {
337           // Indicates that the cell has a the tag which was modified in the src replication cluster
338           Tag tag = pair.getSecond();
339           if (cellVisibility == null && tag != null) {
340             // May need to store only the first one
341             cellVisibility = new CellVisibility(Bytes.toString(tag.getBuffer(), tag.getTagOffset(),
342                 tag.getTagLength()));
343             modifiedTagFound = true;
344           }
345         }
346       }
347       if (!sanityFailure) {
348         if (cellVisibility != null) {
349           String labelsExp = cellVisibility.getExpression();
350           List<Tag> visibilityTags = labelCache.get(labelsExp);
351           if (visibilityTags == null) {
352             // Don't check user auths for labels with Mutations when the user is super user
353             boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
354             try {
355               visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, true,
356                   authCheck);
357             } catch (InvalidLabelException e) {
358               miniBatchOp.setOperationStatus(i,
359                   new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage()));
360             }
361             if (visibilityTags != null) {
362               labelCache.put(labelsExp, visibilityTags);
363             }
364           }
365           if (visibilityTags != null) {
366             List<Cell> updatedCells = new ArrayList<Cell>();
367             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
368               Cell cell = cellScanner.current();
369               List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
370                   cell.getTagsLengthUnsigned());
371               if (modifiedTagFound) {
372                 // Rewrite the tags by removing the modified tags.
373                 removeReplicationVisibilityTag(tags);
374               }
375               tags.addAll(visibilityTags);
376               Cell updatedCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
377                   cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
378                   cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
379                   cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell
380                       .getTypeByte()), cell.getValueArray(), cell.getValueOffset(),
381                   cell.getValueLength(), tags);
382               updatedCells.add(updatedCell);
383             }
384             m.getFamilyCellMap().clear();
385             // Clear and add new Cells to the Mutation.
386             for (Cell cell : updatedCells) {
387               if (m instanceof Put) {
388                 Put p = (Put) m;
389                 p.add(cell);
390               } else if (m instanceof Delete) {
391                 // TODO : Cells without visibility tags would be handled in follow up issue
392                 Delete d = (Delete) m;
393                 d.addDeleteMarker(cell);
394               }
395             }
396           }
397         }
398       }
399     }
400   }
401 
402   @Override
403   public void prePrepareTimeStampForDeleteVersion(
404       ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation delete, Cell cell,
405       byte[] byteNow, Get get) throws IOException {
406     // Nothing to do if we are not filtering by visibility
407     if (!authorizationEnabled) {
408       return;
409     }
410 
411     KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
412     CellVisibility cellVisibility = null;
413     try {
414       cellVisibility = delete.getCellVisibility();
415     } catch (DeserializationException de) {
416       throw new IOException("Invalid cell visibility specified " + delete, de);
417     }
418     // The check for checkForReservedVisibilityTagPresence happens in preBatchMutate happens.
419     // It happens for every mutation and that would be enough.
420     List<Tag> visibilityTags = new ArrayList<Tag>();
421     if (cellVisibility != null) {
422       String labelsExp = cellVisibility.getExpression();
423       try {
424         visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, false,
425             false);
426       } catch (InvalidLabelException e) {
427         throw new IOException("Invalid cell visibility specified " + labelsExp, e);
428       }
429     }
430     get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags,
431         VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT));
432     List<Cell> result = ctx.getEnvironment().getRegion().get(get, false);
433 
434     if (result.size() < get.getMaxVersions()) {
435       // Nothing to delete
436       kv.updateLatestStamp(Bytes.toBytes(Long.MIN_VALUE));
437       return;
438     }
439     if (result.size() > get.getMaxVersions()) {
440       throw new RuntimeException("Unexpected size: " + result.size()
441           + ". Results more than the max versions obtained.");
442     }
443     KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(get.getMaxVersions() - 1));
444     Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
445         getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
446     // We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would
447     // update with the current timestamp after again doing a get. As the hook as already determined
448     // the needed timestamp we need to bypass here.
449     // TODO : See if HRegion.updateDeleteLatestVersionTimeStamp() could be
450     // called only if the hook is not called.
451     ctx.bypass();
452   }
453 
454   /**
455    * Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. This
456    * tag type is reserved and should not be explicitly set by user.
457    *
458    * @param cell
459    *          - the cell under consideration
460    * @param pair - an optional pair of type <Boolean, Tag> which would be reused
461    *               if already set and new one will be created if null is passed
462    * @return a pair<Boolean, Tag> - if the boolean is false then it indicates
463    *         that the cell has a RESERVERD_VIS_TAG and with boolean as true, not
464    *         null tag indicates that a string modified tag was found.
465    */
466   private Pair<Boolean, Tag> checkForReservedVisibilityTagPresence(Cell cell,
467       Pair<Boolean, Tag> pair) throws IOException {
468     if (pair == null) {
469       pair = new Pair<Boolean, Tag>(false, null);
470     } else {
471       pair.setFirst(false);
472       pair.setSecond(null);
473     }
474     // Bypass this check when the operation is done by a system/super user.
475     // This is done because, while Replication, the Cells coming to the peer cluster with reserved
476     // typed tags and this is fine and should get added to the peer cluster table
477     if (isSystemOrSuperUser()) {
478       // Does the cell contain special tag which indicates that the replicated
479       // cell visiblilty tags
480       // have been modified
481       Tag modifiedTag = null;
482       if (cell.getTagsLength() > 0) {
483         Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(),
484             cell.getTagsOffset(), cell.getTagsLength());
485         while (tagsIterator.hasNext()) {
486           Tag tag = tagsIterator.next();
487           if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) {
488             modifiedTag = tag;
489             break;
490           }
491         }
492       }
493       pair.setFirst(true);
494       pair.setSecond(modifiedTag);
495       return pair;
496     }
497     if (cell.getTagsLength() > 0) {
498       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
499           cell.getTagsLength());
500       while (tagsItr.hasNext()) {
501         if (RESERVED_VIS_TAG_TYPES.contains(tagsItr.next().getType())) {
502           return pair;
503         }
504       }
505     }
506     pair.setFirst(true);
507     return pair;
508   }
509 
510   /**
511    * Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. This
512    * tag type is reserved and should not be explicitly set by user. There are
513    * two versions of this method one that accepts pair and other without pair.
514    * In case of preAppend and preIncrement the additional operations are not
515    * needed like checking for STRING_VIS_TAG_TYPE and hence the API without pair
516    * could be used.
517    *
518    * @param cell
519    * @return
520    * @throws IOException
521    */
522   private boolean checkForReservedVisibilityTagPresence(Cell cell) throws IOException {
523     // Bypass this check when the operation is done by a system/super user.
524     // This is done because, while Replication, the Cells coming to the peer
525     // cluster with reserved
526     // typed tags and this is fine and should get added to the peer cluster
527     // table
528     if (isSystemOrSuperUser()) {
529       return true;
530     }
531     if (cell.getTagsLengthUnsigned() > 0) {
532       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
533           cell.getTagsLengthUnsigned());
534       while (tagsItr.hasNext()) {
535         if (RESERVED_VIS_TAG_TYPES.contains(tagsItr.next().getType())) {
536           return false;
537         }
538       }
539     }
540     return true;
541   }
542 
543   private void removeReplicationVisibilityTag(List<Tag> tags) throws IOException {
544     Iterator<Tag> iterator = tags.iterator();
545     while (iterator.hasNext()) {
546       Tag tag = iterator.next();
547       if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) {
548         iterator.remove();
549         break;
550       }
551     }
552   }
553 
554   @Override
555   public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
556       RegionScanner s) throws IOException {
557     if (!initialized) {
558       throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized!");
559     }
560     // Nothing to do if authorization is not enabled
561     if (!authorizationEnabled) {
562       return s;
563     }
564     HRegion region = e.getEnvironment().getRegion();
565     Authorizations authorizations = null;
566     try {
567       authorizations = scan.getAuthorizations();
568     } catch (DeserializationException de) {
569       throw new IOException(de);
570     }
571     if (authorizations == null) {
572       // No Authorizations present for this scan/Get!
573       // In case of system tables other than "labels" just scan with out visibility check and
574       // filtering. Checking visibility labels for META and NAMESPACE table is not needed.
575       TableName table = region.getRegionInfo().getTable();
576       if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
577         return s;
578       }
579     }
580 
581     Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(region,
582         authorizations);
583     if (visibilityLabelFilter != null) {
584       Filter filter = scan.getFilter();
585       if (filter != null) {
586         scan.setFilter(new FilterList(filter, visibilityLabelFilter));
587       } else {
588         scan.setFilter(visibilityLabelFilter);
589       }
590     }
591     return s;
592   }
593 
594   @Override
595   public DeleteTracker postInstantiateDeleteTracker(
596       ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
597       throws IOException {
598     // Nothing to do if we are not filtering by visibility
599     if (!authorizationEnabled) {
600       return delTracker;
601     }
602     HRegion region = ctx.getEnvironment().getRegion();
603     TableName table = region.getRegionInfo().getTable();
604     if (table.isSystemTable()) {
605       return delTracker;
606     }
607     // We are creating a new type of delete tracker here which is able to track
608     // the timestamps and also the
609     // visibility tags per cell. The covering cells are determined not only
610     // based on the delete type and ts
611     // but also on the visibility expression matching.
612     return new VisibilityScanDeleteTracker();
613   }
614 
615   @Override
616   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
617       final Scan scan, final RegionScanner s) throws IOException {
618     User user = VisibilityUtils.getActiveUser();
619     if (user != null && user.getShortName() != null) {
620       scannerOwners.put(s, user.getShortName());
621     }
622     return s;
623   }
624 
625   @Override
626   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
627       final InternalScanner s, final List<Result> result, final int limit, final boolean hasNext)
628       throws IOException {
629     requireScannerOwner(s);
630     return hasNext;
631   }
632 
633   @Override
634   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
635       final InternalScanner s) throws IOException {
636     requireScannerOwner(s);
637   }
638 
639   @Override
640   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
641       final InternalScanner s) throws IOException {
642     // clean up any associated owner mapping
643     scannerOwners.remove(s);
644   }
645 
646   /**
647    * Verify, when servicing an RPC, that the caller is the scanner owner. If so, we assume that
648    * access control is correctly enforced based on the checks performed in preScannerOpen()
649    */
650   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
651     // This is duplicated code!
652     String requestUName = RpcServer.getRequestUserName();
653     String owner = scannerOwners.get(s);
654     if (authorizationEnabled && owner != null && !owner.equals(requestUName)) {
655       throw new AccessDeniedException("User '" + requestUName + "' is not the scanner owner!");
656     }
657   }
658 
659   @Override
660   public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
661       List<Cell> results) throws IOException {
662     if (!initialized) {
663       throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized");
664     }
665     // Nothing useful to do if authorization is not enabled
666     if (!authorizationEnabled) {
667       return;
668     }
669     HRegion region = e.getEnvironment().getRegion();
670     Authorizations authorizations = null;
671     try {
672       authorizations = get.getAuthorizations();
673     } catch (DeserializationException de) {
674       throw new IOException(de);
675     }
676     if (authorizations == null) {
677       // No Authorizations present for this scan/Get!
678       // In case of system tables other than "labels" just scan with out visibility check and
679       // filtering. Checking visibility labels for META and NAMESPACE table is not needed.
680       TableName table = region.getRegionInfo().getTable();
681       if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
682         return;
683       }
684     }
685     Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(e.getEnvironment()
686         .getRegion(), authorizations);
687     if (visibilityLabelFilter != null) {
688       Filter filter = get.getFilter();
689       if (filter != null) {
690         get.setFilter(new FilterList(filter, visibilityLabelFilter));
691       } else {
692         get.setFilter(visibilityLabelFilter);
693       }
694     }
695   }
696 
697   private boolean isSystemOrSuperUser() throws IOException {
698     User activeUser = VisibilityUtils.getActiveUser();
699     if (this.superUsers.contains(activeUser.getShortName())) {
700       return true;
701     }
702     String[] groups = activeUser.getGroupNames();
703     if (groups != null && groups.length > 0) {
704       for (String group : groups) {
705         if (this.superGroups.contains(group)) {
706           return true;
707         }
708       }
709     }
710     return false;
711   }
712 
713   @Override
714   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
715       throws IOException {
716     // If authorization is not enabled, we don't care about reserved tags
717     if (!authorizationEnabled) {
718       return null;
719     }
720     for (CellScanner cellScanner = append.cellScanner(); cellScanner.advance();) {
721       if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
722         throw new FailedSanityCheckException("Append contains cell with reserved type tag");
723       }
724     }
725     return null;
726   }
727 
728   @Override
729   public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
730       throws IOException {
731     // If authorization is not enabled, we don't care about reserved tags
732     if (!authorizationEnabled) {
733       return null;
734     }
735     for (CellScanner cellScanner = increment.cellScanner(); cellScanner.advance();) {
736       if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
737         throw new FailedSanityCheckException("Increment contains cell with reserved type tag");
738       }
739     }
740     return null;
741   }
742 
743   @Override
744   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
745       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
746     List<Tag> tags = Lists.newArrayList();
747     CellVisibility cellVisibility = null;
748     try {
749       cellVisibility = mutation.getCellVisibility();
750     } catch (DeserializationException e) {
751       throw new IOException(e);
752     }
753     if (cellVisibility == null) {
754       return newCell;
755     }
756     // Prepend new visibility tags to a new list of tags for the cell
757     // Don't check user auths for labels with Mutations when the user is super user
758     boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
759     tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
760         true, authCheck));
761     // Save an object allocation where we can
762     if (newCell.getTagsLengthUnsigned() > 0) {
763       // Carry forward all other tags
764       Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(),
765           newCell.getTagsOffset(), newCell.getTagsLengthUnsigned());
766       while (tagsItr.hasNext()) {
767         Tag tag = tagsItr.next();
768         if (tag.getType() != TagType.VISIBILITY_TAG_TYPE
769             && tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
770           tags.add(tag);
771         }
772       }
773     }
774 
775     // We need to create another KV, unfortunately, because the current new KV
776     // has no space for tags
777     KeyValue rewriteKv = new KeyValue(newCell.getRowArray(), newCell.getRowOffset(),
778         newCell.getRowLength(), newCell.getFamilyArray(), newCell.getFamilyOffset(),
779         newCell.getFamilyLength(), newCell.getQualifierArray(), newCell.getQualifierOffset(),
780         newCell.getQualifierLength(), newCell.getTimestamp(), KeyValue.Type.codeToType(newCell
781             .getTypeByte()), newCell.getValueArray(), newCell.getValueOffset(),
782         newCell.getValueLength(), tags);
783     // Preserve mvcc data
784     rewriteKv.setMvccVersion(newCell.getMvccVersion());
785     return rewriteKv;
786   }
787 
788   @Override
789   public Service getService() {
790     return VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this);
791   }
792 
793   /****************************** VisibilityEndpoint service related methods ******************************/
794   @Override
795   public synchronized void addLabels(RpcController controller, VisibilityLabelsRequest request,
796       RpcCallback<VisibilityLabelsResponse> done) {
797     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
798     List<VisibilityLabel> visLabels = request.getVisLabelList();
799     if (!initialized) {
800       setExceptionResults(visLabels.size(),
801         new VisibilityControllerNotReadyException("VisibilityController not yet initialized!"),
802         response);
803     } else {
804       List<byte[]> labels = new ArrayList<byte[]>(visLabels.size());
805       try {
806         if (authorizationEnabled) {
807           checkCallingUserAuth();
808         }
809         RegionActionResult successResult = RegionActionResult.newBuilder().build();
810         for (VisibilityLabel visLabel : visLabels) {
811           byte[] label = visLabel.getLabel().toByteArray();
812           labels.add(label);
813           response.addResult(successResult); // Just mark as success. Later it will get reset
814                                              // based on the result from
815                                              // visibilityLabelService.addLabels ()
816         }
817         if (!labels.isEmpty()) {
818           OperationStatus[] opStatus = this.visibilityLabelService.addLabels(labels);
819           logResult(true, "addLabels", "Adding labels allowed", null, labels, null);
820           int i = 0;
821           for (OperationStatus status : opStatus) {
822             while (response.getResult(i) != successResult)
823               i++;
824             if (status.getOperationStatusCode() != SUCCESS) {
825               RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
826               failureResultBuilder.setException(ResponseConverter
827                   .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
828               response.setResult(i, failureResultBuilder.build());
829             }
830             i++;
831           }
832         }
833       } catch (AccessDeniedException e) {
834         logResult(false, "addLabels", e.getMessage(), null, labels, null);
835         LOG.error("User is not having required permissions to add labels", e);
836         setExceptionResults(visLabels.size(), e, response);
837       } catch (IOException e) {
838         LOG.error(e);
839         setExceptionResults(visLabels.size(), e, response);
840       }
841     }
842     done.run(response.build());
843   }
844 
845   private void setExceptionResults(int size, IOException e,
846       VisibilityLabelsResponse.Builder response) {
847     RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
848     failureResultBuilder.setException(ResponseConverter.buildException(e));
849     RegionActionResult failureResult = failureResultBuilder.build();
850     for (int i = 0; i < size; i++) {
851       response.addResult(i, failureResult);
852     }
853   }
854 
855   @Override
856   public synchronized void setAuths(RpcController controller, SetAuthsRequest request,
857       RpcCallback<VisibilityLabelsResponse> done) {
858     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
859     List<ByteString> auths = request.getAuthList();
860     if (!initialized) {
861       setExceptionResults(auths.size(),
862         new VisibilityControllerNotReadyException("VisibilityController not yet initialized!"),
863         response);
864     } else {
865       byte[] user = request.getUser().toByteArray();
866       List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
867       try {
868         if (authorizationEnabled) {
869           checkCallingUserAuth();
870         }
871         for (ByteString authBS : auths) {
872           labelAuths.add(authBS.toByteArray());
873         }
874         OperationStatus[] opStatus = this.visibilityLabelService.setAuths(user, labelAuths);
875         logResult(true, "setAuths", "Setting authorization for labels allowed", user, labelAuths,
876           null);
877         RegionActionResult successResult = RegionActionResult.newBuilder().build();
878         for (OperationStatus status : opStatus) {
879           if (status.getOperationStatusCode() == SUCCESS) {
880             response.addResult(successResult);
881           } else {
882             RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
883             failureResultBuilder.setException(ResponseConverter
884                 .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
885             response.addResult(failureResultBuilder.build());
886           }
887         }
888       } catch (AccessDeniedException e) {
889         logResult(false, "setAuths", e.getMessage(), user, labelAuths, null);
890         LOG.error("User is not having required permissions to set authorization", e);
891         setExceptionResults(auths.size(), e, response);
892       } catch (IOException e) {
893         LOG.error(e);
894         setExceptionResults(auths.size(), e, response);
895       }
896     }
897     done.run(response.build());
898   }
899 
900   private void logResult(boolean isAllowed, String request, String reason, byte[] user,
901       List<byte[]> labelAuths, String regex) {
902     if (AUDITLOG.isTraceEnabled()) {
903       InetAddress remoteAddr = RpcServer.getRemoteAddress();
904       List<String> labelAuthsStr = new ArrayList<String>();
905       if (labelAuths != null) {
906         int labelAuthsSize = labelAuths.size();
907         labelAuthsStr = new ArrayList<String>(labelAuthsSize);
908         for (int i = 0; i < labelAuthsSize; i++) {
909           labelAuthsStr.add(Bytes.toString(labelAuths.get(i)));
910         }
911       }
912 
913       User requestingUser = null;
914       try {
915         requestingUser = VisibilityUtils.getActiveUser();
916       } catch (IOException e) {
917         LOG.warn("Failed to get active system user.");
918         LOG.debug("Details on failure to get active system user.", e);
919       }
920       AUDITLOG.trace("Access " + (isAllowed ? "allowed" : "denied") + " for user "
921           + (requestingUser != null ? requestingUser.getShortName() : "UNKNOWN") + "; reason: "
922           + reason + "; remote address: " + (remoteAddr != null ? remoteAddr : "") + "; request: "
923           + request + "; user: " + (user != null ? Bytes.toShort(user) : "null") + "; labels: "
924           + labelAuthsStr + "; regex: " + regex);
925     }
926   }
927 
928   @Override
929   public synchronized void getAuths(RpcController controller, GetAuthsRequest request,
930       RpcCallback<GetAuthsResponse> done) {
931     GetAuthsResponse.Builder response = GetAuthsResponse.newBuilder();
932     if (!initialized) {
933       controller.setFailed("VisibilityController not yet initialized");
934     } else {
935       byte[] user = request.getUser().toByteArray();
936       List<String> labels = null;
937       try {
938         // We do ACL check here as we create scanner directly on region. It will not make calls to
939         // AccessController CP methods.
940         if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
941           User requestingUser = VisibilityUtils.getActiveUser();
942           throw new AccessDeniedException("User '"
943               + (requestingUser != null ? requestingUser.getShortName() : "null")
944               + "' is not authorized to perform this action.");
945         }
946         if (AccessControlLists.isGroupPrincipal(Bytes.toString(user))) {
947           // For backward compatibility. Previous custom visibilityLabelService
948           // implementation may not have getGroupAuths
949           try {
950             this.visibilityLabelService.getClass().getDeclaredMethod("getGroupAuths",
951               new Class[] { String[].class, Boolean.TYPE });
952           } catch (SecurityException e) {
953             throw new AccessDeniedException("Failed to obtain getGroupAuths implementation");
954           } catch (NoSuchMethodException e) {
955             throw new AccessDeniedException(
956                 "Get group auth is not supported in this implementation");
957           }
958           String group = AccessControlLists.getGroupName(Bytes.toString(user));
959           labels = this.visibilityLabelService.getGroupAuths(new String[]{group}, false);
960         }
961         else {
962           labels = this.visibilityLabelService.getAuths(user, false);
963         }
964         logResult(true, "getAuths", "Get authorizations for user allowed", user, null, null);
965       } catch (AccessDeniedException e) {
966         logResult(false, "getAuths", e.getMessage(), user, null, null);
967         ResponseConverter.setControllerException(controller, e);
968       } catch (IOException e) {
969         ResponseConverter.setControllerException(controller, e);
970       }
971       response.setUser(request.getUser());
972       if (labels != null) {
973         for (String label : labels) {
974           response.addAuth(ByteStringer.wrap(Bytes.toBytes(label)));
975         }
976       }
977     }
978     done.run(response.build());
979   }
980 
981   @Override
982   public synchronized void clearAuths(RpcController controller, SetAuthsRequest request,
983       RpcCallback<VisibilityLabelsResponse> done) {
984     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
985     List<ByteString> auths = request.getAuthList();
986     if (!initialized) {
987       setExceptionResults(auths.size(), new CoprocessorException(
988           "VisibilityController not yet initialized"), response);
989     } else {
990       byte[] requestUser = request.getUser().toByteArray();
991       List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
992       try {
993         // When AC is ON, do AC based user auth check
994         if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
995           User user = VisibilityUtils.getActiveUser();
996           throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null")
997               + " is not authorized to perform this action.");
998         }
999         if (authorizationEnabled) {
1000           checkCallingUserAuth(); // When AC is not in place the calling user should have
1001                                   // SYSTEM_LABEL auth to do this action.
1002         }
1003         for (ByteString authBS : auths) {
1004           labelAuths.add(authBS.toByteArray());
1005         }
1006 
1007         OperationStatus[] opStatus =
1008             this.visibilityLabelService.clearAuths(requestUser, labelAuths);
1009         logResult(true, "clearAuths", "Removing authorization for labels allowed", requestUser,
1010           labelAuths, null);
1011         RegionActionResult successResult = RegionActionResult.newBuilder().build();
1012         for (OperationStatus status : opStatus) {
1013           if (status.getOperationStatusCode() == SUCCESS) {
1014             response.addResult(successResult);
1015           } else {
1016             RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
1017             failureResultBuilder.setException(ResponseConverter
1018                 .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
1019             response.addResult(failureResultBuilder.build());
1020           }
1021         }
1022       } catch (AccessDeniedException e) {
1023         logResult(false, "clearAuths", e.getMessage(), requestUser, labelAuths, null);
1024         LOG.error("User is not having required permissions to clear authorization", e);
1025         setExceptionResults(auths.size(), e, response);
1026       } catch (IOException e) {
1027         LOG.error(e);
1028         setExceptionResults(auths.size(), e, response);
1029       }
1030     }
1031     done.run(response.build());
1032   }
1033 
1034   @Override
1035   public synchronized void listLabels(RpcController controller, ListLabelsRequest request,
1036       RpcCallback<ListLabelsResponse> done) {
1037     ListLabelsResponse.Builder response = ListLabelsResponse.newBuilder();
1038     if (!initialized) {
1039       controller.setFailed("VisibilityController not yet initialized");
1040     } else {
1041       List<String> labels = null;
1042       String regex = request.hasRegex() ? request.getRegex() : null;
1043       try {
1044         // We do ACL check here as we create scanner directly on region. It will not make calls to
1045         // AccessController CP methods.
1046         if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
1047           User requestingUser = VisibilityUtils.getActiveUser();
1048           throw new AccessDeniedException("User '"
1049               + (requestingUser != null ? requestingUser.getShortName() : "null")
1050               + "' is not authorized to perform this action.");
1051         }
1052         labels = this.visibilityLabelService.listLabels(regex);
1053         logResult(false, "listLabels", "Listing labels allowed", null, null, regex);
1054       } catch (AccessDeniedException e) {
1055         logResult(false, "listLabels", e.getMessage(), null, null, regex);
1056         ResponseConverter.setControllerException(controller, e);
1057       } catch (IOException e) {
1058         ResponseConverter.setControllerException(controller, e);
1059       }
1060       if (labels != null && !labels.isEmpty()) {
1061         for (String label : labels) {
1062           response.addLabel(ByteStringer.wrap(Bytes.toBytes(label)));
1063         }
1064       }
1065     }
1066     done.run(response.build());
1067   }
1068 
1069   private void checkCallingUserAuth() throws IOException {
1070     if (!authorizationEnabled) { // Redundant, but just in case
1071       return;
1072     }
1073     if (!accessControllerAvailable) {
1074       User user = VisibilityUtils.getActiveUser();
1075       if (user == null) {
1076         throw new IOException("Unable to retrieve calling user");
1077       }
1078 
1079       boolean havingSystemAuth = false;
1080       try {
1081         this.visibilityLabelService.getClass().getDeclaredMethod("havingSystemAuth",
1082           new Class[] { User.class });
1083         havingSystemAuth = this.visibilityLabelService.havingSystemAuth(user);
1084       } catch (SecurityException e) {
1085         // Just consider this as AccessDeniedException
1086       } catch (NoSuchMethodException e) {
1087         // VLS not having havingSystemAuth(User) method. Go with deprecated havingSystemAuth(byte[])
1088         // method invoke
1089         havingSystemAuth = this.visibilityLabelService.havingSystemAuth(Bytes.toBytes(user
1090           .getShortName()));
1091       }
1092       if (!havingSystemAuth) {
1093         throw new AccessDeniedException("User '" + user.getShortName()
1094           + "' is not authorized to perform this action.");
1095       }
1096     }
1097   }
1098 
1099   private static class DeleteVersionVisibilityExpressionFilter extends FilterBase {
1100     private List<Tag> deleteCellVisTags;
1101     private Byte deleteCellVisTagsFormat;
1102 
1103     public DeleteVersionVisibilityExpressionFilter(List<Tag> deleteCellVisTags,
1104         Byte deleteCellVisTagsFormat) {
1105       this.deleteCellVisTags = deleteCellVisTags;
1106       this.deleteCellVisTagsFormat = deleteCellVisTagsFormat;
1107     }
1108 
1109     @Override
1110     public ReturnCode filterKeyValue(Cell cell) throws IOException {
1111       List<Tag> putVisTags = new ArrayList<Tag>();
1112       Byte putCellVisTagsFormat = VisibilityUtils.extractVisibilityTags(cell, putVisTags);
1113       boolean matchFound = VisibilityLabelServiceManager
1114           .getInstance().getVisibilityLabelService()
1115           .matchVisibility(putVisTags, putCellVisTagsFormat, deleteCellVisTags,
1116               deleteCellVisTagsFormat);
1117       return matchFound ? ReturnCode.INCLUDE : ReturnCode.SKIP;
1118     }
1119   }
1120 
1121   @Override
1122   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1123       TableName tableName) throws IOException {
1124   }
1125 
1126   @Override
1127   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1128       TableName tableName) throws IOException {
1129   }
1130 
1131   @Override
1132   public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
1133       TableName tableName) throws IOException {
1134   }
1135 
1136   @Override
1137   public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
1138       TableName tableName) throws IOException {
1139   }
1140 
1141   /**
1142    * A RegionServerObserver impl that provides the custom
1143    * VisibilityReplicationEndpoint. This class should be configured as the
1144    * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be
1145    * replicated as string.  The value for the configuration should be
1146    * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'.
1147    */
1148   public static class VisibilityReplication extends BaseRegionServerObserver {
1149     private Configuration conf;
1150     private VisibilityLabelService visibilityLabelService;
1151 
1152     @Override
1153     public void start(CoprocessorEnvironment env) throws IOException {
1154       this.conf = env.getConfiguration();
1155       visibilityLabelService = VisibilityLabelServiceManager.getInstance()
1156           .getVisibilityLabelService(this.conf);
1157     }
1158 
1159     @Override
1160     public void stop(CoprocessorEnvironment env) throws IOException {
1161     }
1162 
1163     @Override
1164     public ReplicationEndpoint postCreateReplicationEndPoint(
1165         ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
1166       return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
1167     }
1168   }
1169 }