1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.security.visibility;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.UUID;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.Tag;
30 import org.apache.hadoop.hbase.TagType;
31 import org.apache.hadoop.hbase.KeyValue.Type;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
34 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
35 import org.apache.hadoop.hbase.replication.WALEntryFilter;
36 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
37
38 import com.google.common.util.concurrent.ListenableFuture;
39
40 @InterfaceAudience.Private
41 public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
42
43 private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class);
44 private ReplicationEndpoint delegator;
45 private VisibilityLabelService visibilityLabelsService;
46
47 public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
48 VisibilityLabelService visibilityLabelsService) {
49 this.delegator = endpoint;
50 this.visibilityLabelsService = visibilityLabelsService;
51 }
52
53 @Override
54 public void init(Context context) throws IOException {
55 delegator.init(context);
56 }
57
58 @Override
59 public boolean replicate(ReplicateContext replicateContext) {
60 if (!delegator.canReplicateToSameCluster()) {
61
62
63 List<Entry> entries = replicateContext.getEntries();
64 List<Tag> visTags = new ArrayList<Tag>();
65 List<Tag> nonVisTags = new ArrayList<Tag>();
66 List<Entry> newEntries = new ArrayList<Entry>(entries.size());
67 for (Entry entry : entries) {
68 WALEdit newEdit = new WALEdit();
69 ArrayList<Cell> cells = entry.getEdit().getCells();
70 for (Cell cell : cells) {
71 if (cell.getTagsLengthUnsigned() > 0) {
72 visTags.clear();
73 nonVisTags.clear();
74 Byte serializationFormat = VisibilityUtils.extractAndPartitionTags(cell, visTags,
75 nonVisTags);
76 if (!visTags.isEmpty()) {
77 try {
78 byte[] modifiedVisExpression = visibilityLabelsService
79 .encodeVisibilityForReplication(visTags, serializationFormat);
80 if (modifiedVisExpression != null) {
81 nonVisTags.add(new Tag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression));
82 }
83 } catch (Exception ioe) {
84 LOG.error(
85 "Exception while reading the visibility labels from the cell. The replication "
86 + "would happen as per the existing format and not as string type for the cell "
87 + cell + ".", ioe);
88
89 newEdit.add(cell);
90 continue;
91 }
92
93 Cell newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
94 cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
95 cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
96 cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell
97 .getTypeByte()), cell.getValueArray(), cell.getValueOffset(),
98 cell.getValueLength(), nonVisTags);
99 newEdit.add(newCell);
100 } else {
101 newEdit.add(cell);
102 }
103 } else {
104 newEdit.add(cell);
105 }
106 }
107 newEntries.add(new Entry(entry.getKey(), newEdit));
108 }
109 replicateContext.setEntries(newEntries);
110 return delegator.replicate(replicateContext);
111 } else {
112 return delegator.replicate(replicateContext);
113 }
114 }
115
116 @Override
117 public synchronized UUID getPeerUUID() {
118 return delegator.getPeerUUID();
119 }
120
121 @Override
122 public boolean canReplicateToSameCluster() {
123 return delegator.canReplicateToSameCluster();
124 }
125
126 @Override
127 public WALEntryFilter getWALEntryfilter() {
128 return delegator.getWALEntryfilter();
129 }
130
131 @Override
132 public boolean isRunning() {
133 return delegator.isRunning();
134 }
135
136 @Override
137 public ListenableFuture<State> start() {
138 return delegator.start();
139 }
140
141 @Override
142 public State startAndWait() {
143 return delegator.startAndWait();
144 }
145
146 @Override
147 public State state() {
148 return delegator.state();
149 }
150
151 @Override
152 public ListenableFuture<State> stop() {
153 return delegator.stop();
154 }
155
156 @Override
157 public State stopAndWait() {
158 return delegator.stopAndWait();
159 }
160
161 }