1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.Arrays;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.KeyValue;
31 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
32 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
33 import org.apache.hadoop.hbase.util.Bytes;
34
35
36
37
38
39
40
41
42
43 public class SampleRegionWALObserver extends BaseRegionObserver
44 implements WALObserver {
45
46 private static final Log LOG = LogFactory.getLog(SampleRegionWALObserver.class);
47
48 private byte[] tableName;
49 private byte[] row;
50 private byte[] ignoredFamily;
51 private byte[] ignoredQualifier;
52 private byte[] addedFamily;
53 private byte[] addedQualifier;
54 private byte[] changedFamily;
55 private byte[] changedQualifier;
56
57 private boolean preWALWriteCalled = false;
58 private boolean postWALWriteCalled = false;
59 private boolean preWALRestoreCalled = false;
60 private boolean postWALRestoreCalled = false;
61
62
63
64
65
66 public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq,
67 byte[] chf, byte[] chq, byte[] addf, byte[] addq) {
68 this.row = row;
69 this.tableName = tableName;
70 this.ignoredFamily = igf;
71 this.ignoredQualifier = igq;
72 this.addedFamily = addf;
73 this.addedQualifier = addq;
74 this.changedFamily = chf;
75 this.changedQualifier = chq;
76 }
77
78
79 @Override
80 public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> env,
81 HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
82 postWALWriteCalled = true;
83 }
84
85 @Override
86 public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> env,
87 HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
88 boolean bypass = false;
89
90 if (!Bytes.equals(info.getTableName(), this.tableName)) {
91 return bypass;
92 }
93 preWALWriteCalled = true;
94
95
96 List<KeyValue> kvs = logEdit.getKeyValues();
97 KeyValue deletedKV = null;
98 for (KeyValue kv : kvs) {
99
100 byte[] family = kv.getFamily();
101 byte[] qulifier = kv.getQualifier();
102
103 if (Arrays.equals(family, ignoredFamily) &&
104 Arrays.equals(qulifier, ignoredQualifier)) {
105 LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
106 deletedKV = kv;
107 }
108 if (Arrays.equals(family, changedFamily) &&
109 Arrays.equals(qulifier, changedQualifier)) {
110 LOG.debug("Found the KeyValue from WALEdit which should be changed.");
111 kv.getBuffer()[kv.getValueOffset()] += 1;
112 }
113 }
114 kvs.add(new KeyValue(row, addedFamily, addedQualifier));
115 if (deletedKV != null) {
116 LOG.debug("About to delete a KeyValue from WALEdit.");
117 kvs.remove(deletedKV);
118 }
119 return bypass;
120 }
121
122
123
124
125
126 @Override
127 public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
128 HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
129 preWALRestoreCalled = true;
130 }
131
132
133
134
135
136 @Override
137 public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
138 HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
139 postWALRestoreCalled = true;
140 }
141
142 public boolean isPreWALWriteCalled() {
143 return preWALWriteCalled;
144 }
145
146 public boolean isPostWALWriteCalled() {
147 return postWALWriteCalled;
148 }
149
150 public boolean isPreWALRestoreCalled() {
151 LOG.debug(SampleRegionWALObserver.class.getName() +
152 ".isPreWALRestoreCalled is called.");
153 return preWALRestoreCalled;
154 }
155
156 public boolean isPostWALRestoreCalled() {
157 LOG.debug(SampleRegionWALObserver.class.getName() +
158 ".isPostWALRestoreCalled is called.");
159 return postWALRestoreCalled;
160 }
161 }