1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.PrintStream;
26 import java.util.ArrayList;
27
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.testclassification.LargeTests;
36 import org.apache.hadoop.hbase.MiniHBaseCluster;
37 import org.apache.hadoop.hbase.client.Delete;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43 import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper;
44 import org.apache.hadoop.hbase.regionserver.wal.HLog;
45 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
46 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
49 import org.apache.hadoop.mapreduce.Mapper;
50 import org.apache.hadoop.mapreduce.Mapper.Context;
51 import org.junit.AfterClass;
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55 import org.mockito.invocation.InvocationOnMock;
56 import org.mockito.stubbing.Answer;
57
58 import static org.mockito.Matchers.any;
59 import static org.mockito.Mockito.*;
60
61
62
63
64 @Category(LargeTests.class)
65 public class TestWALPlayer {
66 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67 private static MiniHBaseCluster cluster;
68
69 @BeforeClass
70 public static void beforeClass() throws Exception {
71 cluster = TEST_UTIL.startMiniCluster();
72 TEST_UTIL.startMiniMapReduceCluster();
73 }
74
75 @AfterClass
76 public static void afterClass() throws Exception {
77 TEST_UTIL.shutdownMiniMapReduceCluster();
78 TEST_UTIL.shutdownMiniCluster();
79 }
80
81
82
83
84
85 @Test
86 public void testWALPlayer() throws Exception {
87 final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1");
88 final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2");
89 final byte[] FAMILY = Bytes.toBytes("family");
90 final byte[] COLUMN1 = Bytes.toBytes("c1");
91 final byte[] COLUMN2 = Bytes.toBytes("c2");
92 final byte[] ROW = Bytes.toBytes("row");
93 HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
94 HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
95
96
97 Put p = new Put(ROW);
98 p.add(FAMILY, COLUMN1, COLUMN1);
99 p.add(FAMILY, COLUMN2, COLUMN2);
100 t1.put(p);
101
102 Delete d = new Delete(ROW);
103 d.deleteColumns(FAMILY, COLUMN1);
104 t1.delete(d);
105
106
107 HLog log = cluster.getRegionServer(0).getWAL();
108 log.rollWriter();
109 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
110 .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
111
112 Configuration configuration= TEST_UTIL.getConfiguration();
113 WALPlayer player = new WALPlayer(configuration);
114 String optionName="_test_.name";
115 configuration.set(optionName, "1000");
116 player.setupTime(configuration, optionName);
117 assertEquals(1000,configuration.getLong(optionName,0));
118 assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
119 Bytes.toString(TABLENAME2) }));
120
121
122
123 Get g = new Get(ROW);
124 Result r = t2.get(g);
125 assertEquals(1, r.size());
126 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
127 }
128
129
130
131
132 @Test
133 public void testHLogKeyValueMapper() throws Exception {
134 Configuration configuration = new Configuration();
135 configuration.set(WALPlayer.TABLES_KEY, "table");
136 HLogKeyValueMapper mapper = new HLogKeyValueMapper();
137 HLogKey key = mock(HLogKey.class);
138 when(key.getTablename()).thenReturn(TableName.valueOf("table"));
139 @SuppressWarnings("unchecked")
140 Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
141 mock(Context.class);
142 when(context.getConfiguration()).thenReturn(configuration);
143
144 WALEdit value = mock(WALEdit.class);
145 ArrayList<KeyValue> values = new ArrayList<KeyValue>();
146 KeyValue kv1 = mock(KeyValue.class);
147 when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
148 when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
149 values.add(kv1);
150 when(value.getKeyValues()).thenReturn(values);
151 mapper.setup(context);
152
153 doAnswer(new Answer<Void>() {
154
155 @Override
156 public Void answer(InvocationOnMock invocation) throws Throwable {
157 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
158 KeyValue key = (KeyValue) invocation.getArguments()[1];
159 assertEquals("row", Bytes.toString(writer.get()));
160 assertEquals("row", Bytes.toString(key.getRow()));
161 return null;
162 }
163 }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
164
165 mapper.map(key, value, context);
166
167 }
168
169
170
171
172 @Test
173 public void testMainMethod() throws Exception {
174
175 PrintStream oldPrintStream = System.err;
176 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
177 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
178 System.setSecurityManager(newSecurityManager);
179 ByteArrayOutputStream data = new ByteArrayOutputStream();
180 String[] args = {};
181 System.setErr(new PrintStream(data));
182 try {
183 System.setErr(new PrintStream(data));
184 try {
185 WALPlayer.main(args);
186 fail("should be SecurityException");
187 } catch (SecurityException e) {
188 assertEquals(-1, newSecurityManager.getExitCode());
189 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
190 assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
191 " <tables> [<tableMappings>]"));
192 assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output"));
193 }
194
195 } finally {
196 System.setErr(oldPrintStream);
197 System.setSecurityManager(SECURITY_MANAGER);
198 }
199
200 }
201
202 }