1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.apache.hadoop.hbase.util.ByteStringer;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.Stoppable;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.Scan;
44 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
45 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
46 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.junit.AfterClass;
49 import org.junit.Before;
50 import org.junit.BeforeClass;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53
54 @Category(MediumTests.class)
55 public class TestReplicationSink {
56 private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
57 private static final int BATCH_SIZE = 10;
58
59 private final static HBaseTestingUtility TEST_UTIL =
60 new HBaseTestingUtility();
61
62 private static ReplicationSink SINK;
63
64 private static final byte[] TABLE_NAME1 =
65 Bytes.toBytes("table1");
66 private static final byte[] TABLE_NAME2 =
67 Bytes.toBytes("table2");
68
69 private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
70 private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
71
72 private static HTable table1;
73 private static Stoppable STOPPABLE = new Stoppable() {
74 final AtomicBoolean stop = new AtomicBoolean(false);
75
76 @Override
77 public boolean isStopped() {
78 return this.stop.get();
79 }
80
81 @Override
82 public void stop(String why) {
83 LOG.info("STOPPING BECAUSE: " + why);
84 this.stop.set(true);
85 }
86
87 };
88
89 private static HTable table2;
90
91
92
93
94 @BeforeClass
95 public static void setUpBeforeClass() throws Exception {
96 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
97 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
98 HConstants.REPLICATION_ENABLE_DEFAULT);
99 TEST_UTIL.startMiniCluster(3);
100 SINK =
101 new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
102 table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
103 table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
104 }
105
106
107
108
109 @AfterClass
110 public static void tearDownAfterClass() throws Exception {
111 STOPPABLE.stop("Shutting down");
112 TEST_UTIL.shutdownMiniCluster();
113 }
114
115
116
117
118 @Before
119 public void setUp() throws Exception {
120 table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
121 table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
122 }
123
124
125
126
127
128 @Test
129 public void testBatchSink() throws Exception {
130 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE);
131 List<Cell> cells = new ArrayList<Cell>();
132 for(int i = 0; i < BATCH_SIZE; i++) {
133 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
134 }
135 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
136 Scan scan = new Scan();
137 ResultScanner scanRes = table1.getScanner(scan);
138 assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
139 }
140
141
142
143
144
145 @Test
146 public void testMixedPutDelete() throws Exception {
147 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
148 List<Cell> cells = new ArrayList<Cell>();
149 for(int i = 0; i < BATCH_SIZE/2; i++) {
150 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
151 }
152 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
153
154 entries = new ArrayList<WALEntry>(BATCH_SIZE);
155 cells = new ArrayList<Cell>();
156 for(int i = 0; i < BATCH_SIZE; i++) {
157 entries.add(createEntry(TABLE_NAME1, i,
158 i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
159 }
160
161 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
162 Scan scan = new Scan();
163 ResultScanner scanRes = table1.getScanner(scan);
164 assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
165 }
166
167
168
169
170
171 @Test
172 public void testMixedPutTables() throws Exception {
173 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
174 List<Cell> cells = new ArrayList<Cell>();
175 for(int i = 0; i < BATCH_SIZE; i++) {
176 entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
177 i, KeyValue.Type.Put, cells));
178 }
179
180 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
181 Scan scan = new Scan();
182 ResultScanner scanRes = table2.getScanner(scan);
183 for(Result res : scanRes) {
184 assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
185 }
186 }
187
188
189
190
191
192 @Test
193 public void testMixedDeletes() throws Exception {
194 List<WALEntry> entries = new ArrayList<WALEntry>(3);
195 List<Cell> cells = new ArrayList<Cell>();
196 for(int i = 0; i < 3; i++) {
197 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
198 }
199 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
200 entries = new ArrayList<WALEntry>(3);
201 cells = new ArrayList<Cell>();
202 entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
203 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
204 entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
205
206 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
207
208 Scan scan = new Scan();
209 ResultScanner scanRes = table1.getScanner(scan);
210 assertEquals(0, scanRes.next(3).length);
211 }
212
213
214
215
216
217
218 @Test
219 public void testApplyDeleteBeforePut() throws Exception {
220 List<WALEntry> entries = new ArrayList<WALEntry>(5);
221 List<Cell> cells = new ArrayList<Cell>();
222 for(int i = 0; i < 2; i++) {
223 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
224 }
225 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
226 for(int i = 3; i < 5; i++) {
227 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
228 }
229 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
230 Get get = new Get(Bytes.toBytes(1));
231 Result res = table1.get(get);
232 assertEquals(0, res.size());
233 }
234
235 private WALEntry createEntry(byte [] table, int row, KeyValue.Type type, List<Cell> cells) {
236 byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
237 byte[] rowBytes = Bytes.toBytes(row);
238
239
240 try {
241 Thread.sleep(1);
242 } catch (InterruptedException e) {
243 LOG.info("Was interrupted while sleep, meh", e);
244 }
245 final long now = System.currentTimeMillis();
246 KeyValue kv = null;
247 if(type.getCode() == KeyValue.Type.Put.getCode()) {
248 kv = new KeyValue(rowBytes, fam, fam, now,
249 KeyValue.Type.Put, Bytes.toBytes(row));
250 } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
251 kv = new KeyValue(rowBytes, fam, fam,
252 now, KeyValue.Type.DeleteColumn);
253 } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
254 kv = new KeyValue(rowBytes, fam, null,
255 now, KeyValue.Type.DeleteFamily);
256 }
257 WALEntry.Builder builder = WALEntry.newBuilder();
258 builder.setAssociatedCellCount(1);
259 WALKey.Builder keyBuilder = WALKey.newBuilder();
260 UUID.Builder uuidBuilder = UUID.newBuilder();
261 uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
262 uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
263 keyBuilder.setClusterId(uuidBuilder.build());
264 keyBuilder.setTableName(ByteStringer.wrap(table));
265 keyBuilder.setWriteTime(now);
266 keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
267 keyBuilder.setLogSequenceNumber(-1);
268 builder.setKey(keyBuilder.build());
269 cells.add(kv);
270
271 return builder.build();
272 }
273
274 }