1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Matchers.anyInt;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.PrintStream;
32
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
37 import org.apache.hadoop.hbase.testclassification.SmallTests;
38 import org.apache.hadoop.mapred.JobConf;
39 import org.apache.hadoop.mapred.OutputCollector;
40 import org.apache.hadoop.mapred.Reporter;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43 import org.mockito.Mockito;
44
45 import com.google.common.base.Joiner;
46
47 @Category(SmallTests.class)
48 public class TestRowCounter {
49
50 @Test
51 @SuppressWarnings("deprecation")
52 public void shouldPrintUsage() throws Exception {
53 String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
54 String result = new OutputReader(System.out) {
55 @Override
56 void doRead() {
57 assertEquals(-1, RowCounter.printUsage());
58 }
59 }.read();
60
61 assertTrue(result.startsWith(expectedOutput));
62 }
63
64 @Test
65 @SuppressWarnings("deprecation")
66 public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
67 throws Exception {
68 final String[] args = new String[] { "one", "two" };
69 String line = "ERROR: Wrong number of parameters: " + args.length;
70 String result = new OutputReader(System.err) {
71 @Override
72 void doRead() throws Exception {
73 assertEquals(-1, new RowCounter().run(args));
74 }
75 }.read();
76
77 assertTrue(result.startsWith(line));
78 }
79
80 @Test
81 @SuppressWarnings({ "deprecation", "unchecked" })
82 public void shouldRegInReportEveryIncomingRow() throws IOException {
83 int iterationNumber = 999;
84 RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
85 Reporter reporter = mock(Reporter.class);
86 for (int i = 0; i < iterationNumber; i++)
87 mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
88 mock(OutputCollector.class), reporter);
89
90 Mockito.verify(reporter, times(iterationNumber)).incrCounter(
91 any(Enum.class), anyInt());
92 }
93
94 @Test
95 @SuppressWarnings({ "deprecation" })
96 public void shouldCreateAndRunSubmittableJob() throws Exception {
97 RowCounter rCounter = new RowCounter();
98 rCounter.setConf(HBaseConfiguration.create());
99 String[] args = new String[] { "\temp", "tableA", "column1", "column2",
100 "column3" };
101 JobConf jobConfig = rCounter.createSubmittableJob(args);
102
103 assertNotNull(jobConfig);
104 assertEquals(0, jobConfig.getNumReduceTasks());
105 assertEquals("rowcounter", jobConfig.getJobName());
106 assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
107 assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
108 assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
109 .join("column1", "column2", "column3"));
110 assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
111 }
112
113 enum Outs {
114 OUT, ERR
115 }
116
117 private static abstract class OutputReader {
118 private final PrintStream ps;
119 private PrintStream oldPrintStream;
120 private Outs outs;
121
122 protected OutputReader(PrintStream ps) {
123 this.ps = ps;
124 }
125
126 protected String read() throws Exception {
127 ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
128 if (ps == System.out) {
129 oldPrintStream = System.out;
130 outs = Outs.OUT;
131 System.setOut(new PrintStream(outBytes));
132 } else if (ps == System.err) {
133 oldPrintStream = System.err;
134 outs = Outs.ERR;
135 System.setErr(new PrintStream(outBytes));
136 } else {
137 throw new IllegalStateException("OutputReader: unsupported PrintStream");
138 }
139
140 try {
141 doRead();
142 return new String(outBytes.toByteArray());
143 } finally {
144 switch (outs) {
145 case OUT: {
146 System.setOut(oldPrintStream);
147 break;
148 }
149 case ERR: {
150 System.setErr(oldPrintStream);
151 break;
152 }
153 default:
154 throw new IllegalStateException(
155 "OutputReader: unsupported PrintStream");
156 }
157 }
158 }
159
160 abstract void doRead() throws Exception;
161 }
162 }