1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.PrintStream;
28 import java.util.ArrayList;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.testclassification.MediumTests;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
41 import org.apache.hadoop.mapreduce.Counter;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.util.GenericOptionsParser;
44 import org.junit.AfterClass;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49
50
51
52 @Category(MediumTests.class)
53 public class TestRowCounter {
54 final Log LOG = LogFactory.getLog(getClass());
55 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
56 private final static String TABLE_NAME = "testRowCounter";
57 private final static String COL_FAM = "col_fam";
58 private final static String COL1 = "c1";
59 private final static String COL2 = "c2";
60 private final static String COMPOSITE_COLUMN = "C:A:A";
61 private final static int TOTAL_ROWS = 10;
62 private final static int ROWS_WITH_ONE_COL = 2;
63
64
65
66
67 @BeforeClass
68 public static void setUpBeforeClass()
69 throws Exception {
70 TEST_UTIL.startMiniCluster();
71 TEST_UTIL.startMiniMapReduceCluster();
72 HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME),
73 Bytes.toBytes(COL_FAM));
74 writeRows(table);
75 table.close();
76 }
77
78
79
80
81 @AfterClass
82 public static void tearDownAfterClass()
83 throws Exception {
84 TEST_UTIL.shutdownMiniCluster();
85 TEST_UTIL.shutdownMiniMapReduceCluster();
86 }
87
88
89
90
91
92
93 @Test
94 public void testRowCounterNoColumn()
95 throws Exception {
96 String[] args = new String[] {TABLE_NAME};
97 runRowCount(args, 10);
98 }
99
100
101
102
103
104
105
106 @Test
107 public void testRowCounterExclusiveColumn()
108 throws Exception {
109 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1};
110 runRowCount(args, 8);
111 }
112
113
114
115
116
117
118
119 @Test
120 public void testRowCounterColumnWithColonInQualifier()
121 throws Exception {
122 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN};
123 runRowCount(args, 8);
124 }
125
126
127
128
129
130
131
132 @Test
133 public void testRowCounterHiddenColumn()
134 throws Exception {
135 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL2};
136 runRowCount(args, 10);
137 }
138
139
140
141
142
143
144 @Test
145 public void testRowCounterTimeRange()
146 throws Exception {
147 final byte[] family = Bytes.toBytes(COL_FAM);
148 final byte[] col1 = Bytes.toBytes(COL1);
149 Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
150 Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
151 Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
152
153 long ts;
154
155
156 HTable table = TEST_UTIL.truncateTable(TableName.valueOf(TABLE_NAME));
157 ts = System.currentTimeMillis();
158 put1.add(family, col1, ts, Bytes.toBytes("val1"));
159 table.put(put1);
160 Thread.sleep(100);
161
162 ts = System.currentTimeMillis();
163 put2.add(family, col1, ts, Bytes.toBytes("val2"));
164 put3.add(family, col1, ts, Bytes.toBytes("val3"));
165 table.put(put2);
166 table.put(put3);
167 table.close();
168
169 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
170 "--endtime=" + ts};
171 runRowCount(args, 1);
172
173 args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
174 "--endtime=" + (ts - 10)};
175 runRowCount(args, 1);
176
177 args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts,
178 "--endtime=" + (ts + 1000)};
179 runRowCount(args, 2);
180
181 args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
182 "--endtime=" + (ts + 30 * 1000),};
183 runRowCount(args, 3);
184 }
185
186
187
188
189
190
191
192
193 private void runRowCount(String[] args, int expectedCount)
194 throws Exception {
195 GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args);
196 Configuration conf = opts.getConfiguration();
197 args = opts.getRemainingArgs();
198 Job job = RowCounter.createSubmittableJob(conf, args);
199 job.waitForCompletion(true);
200 assertTrue(job.isSuccessful());
201 Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
202 assertEquals(expectedCount, counter.getValue());
203 }
204
205
206
207
208
209
210
211
212 private static void writeRows(HTable table) throws IOException {
213 final byte[] family = Bytes.toBytes(COL_FAM);
214 final byte[] value = Bytes.toBytes("abcd");
215 final byte[] col1 = Bytes.toBytes(COL1);
216 final byte[] col2 = Bytes.toBytes(COL2);
217 final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
218 ArrayList<Put> rowsUpdate = new ArrayList<Put>();
219
220 int i = 0;
221 for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
222 byte[] row = Bytes.toBytes("row" + i);
223 Put put = new Put(row);
224 put.add(family, col1, value);
225 put.add(family, col2, value);
226 put.add(family, col3, value);
227 rowsUpdate.add(put);
228 }
229
230
231 for (; i < TOTAL_ROWS; i++) {
232 byte[] row = Bytes.toBytes("row" + i);
233 Put put = new Put(row);
234 put.add(family, col2, value);
235 rowsUpdate.add(put);
236 }
237 table.put(rowsUpdate);
238 }
239
240
241
242
243 @Test
244 public void testImportMain()
245 throws Exception {
246 PrintStream oldPrintStream = System.err;
247 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
248 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
249 System.setSecurityManager(newSecurityManager);
250 ByteArrayOutputStream data = new ByteArrayOutputStream();
251 String[] args = {};
252 System.setErr(new PrintStream(data));
253 try {
254 System.setErr(new PrintStream(data));
255
256 try {
257 RowCounter.main(args);
258 fail("should be SecurityException");
259 } catch (SecurityException e) {
260 assertEquals(-1, newSecurityManager.getExitCode());
261 assertTrue(data.toString().contains("Wrong number of parameters:"));
262 assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
263 "[--starttime=[start] --endtime=[end] " +
264 "[--range=[startKey],[endKey]] " +
265 "[<column1> <column2>...]"));
266 assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
267 assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
268 }
269 data.reset();
270 try {
271 args = new String[2];
272 args[0] = "table";
273 args[1] = "--range=1";
274 RowCounter.main(args);
275 fail("should be SecurityException");
276 } catch (SecurityException e) {
277 assertEquals(-1, newSecurityManager.getExitCode());
278 assertTrue(data.toString().contains("Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
279 " \"--range=,b\" or \"--range=a,\""));
280 assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
281 "[--starttime=[start] --endtime=[end] " +
282 "[--range=[startKey],[endKey]] " +
283 "[<column1> <column2>...]"));
284 }
285
286 } finally {
287 System.setErr(oldPrintStream);
288 System.setSecurityManager(SECURITY_MANAGER);
289 }
290
291 }
292 }