1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.filter.CompareFilter;
36 import org.apache.hadoop.hbase.filter.Filter;
37 import org.apache.hadoop.hbase.filter.PrefixFilter;
38 import org.apache.hadoop.hbase.filter.RegexStringComparator;
39 import org.apache.hadoop.hbase.filter.RowFilter;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.io.TimeRange;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.IntWritable;
44 import org.apache.hadoop.io.Text;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.Reducer;
47 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
49 import org.apache.hadoop.util.GenericOptionsParser;
50
51 import com.google.common.base.Preconditions;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 @InterfaceAudience.Public
72 @InterfaceStability.Stable
73 public class CellCounter {
74 private static final Log LOG =
75 LogFactory.getLog(CellCounter.class.getName());
76
77
78
79
80
81 static final String NAME = "CellCounter";
82
83
84
85
86 static class CellCounterMapper
87 extends TableMapper<Text, IntWritable> {
88
89
90
91 public static enum Counters {
92 ROWS
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 @Override
107 public void map(ImmutableBytesWritable row, Result values,
108 Context context)
109 throws IOException {
110 Preconditions.checkState(values != null,
111 "values passed to the map is null");
112 String currentFamilyName = null;
113 String currentQualifierName = null;
114 String currentRowKey = null;
115 Configuration config = context.getConfiguration();
116 String separator = config.get("ReportSeparator",":");
117 try {
118 context.getCounter(Counters.ROWS).increment(1);
119 context.write(new Text("Total ROWS"), new IntWritable(1));
120
121 for (Cell value : values.listCells()) {
122 currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
123 String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
124 if (!thisRowFamilyName.equals(currentFamilyName)) {
125 currentFamilyName = thisRowFamilyName;
126 context.getCounter("CF", thisRowFamilyName).increment(1);
127 if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
128 context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
129 context.write(new Text(thisRowFamilyName), new IntWritable(1));
130 }
131 }
132 String thisRowQualifierName = thisRowFamilyName + separator
133 + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
134 if (!thisRowQualifierName.equals(currentQualifierName)) {
135 currentQualifierName = thisRowQualifierName;
136 context.getCounter("CFQL", thisRowQualifierName).increment(1);
137 context.write(new Text("Total Qualifiers across all Rows"),
138 new IntWritable(1));
139 context.write(new Text(thisRowQualifierName), new IntWritable(1));
140
141 context.getCounter("QL_VERSIONS", currentRowKey + separator +
142 thisRowQualifierName).increment(1);
143 context.write(new Text(currentRowKey + separator
144 + thisRowQualifierName + "_Versions"), new IntWritable(1));
145
146 } else {
147
148 currentQualifierName = thisRowQualifierName;
149 context.getCounter("QL_VERSIONS", currentRowKey + separator +
150 thisRowQualifierName).increment(1);
151 context.write(new Text(currentRowKey + separator
152 + thisRowQualifierName + "_Versions"), new IntWritable(1));
153 }
154 }
155 } catch (InterruptedException e) {
156 e.printStackTrace();
157 }
158 }
159 }
160
161 static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
162 Key, IntWritable> {
163
164 private IntWritable result = new IntWritable();
165 public void reduce(Key key, Iterable<IntWritable> values,
166 Context context)
167 throws IOException, InterruptedException {
168 int sum = 0;
169 for (IntWritable val : values) {
170 sum += val.get();
171 }
172 result.set(sum);
173 context.write(key, result);
174 }
175 }
176
177
178
179
180
181
182
183
184
185 public static Job createSubmittableJob(Configuration conf, String[] args)
186 throws IOException {
187 String tableName = args[0];
188 Path outputDir = new Path(args[1]);
189 String reportSeparatorString = (args.length > 2) ? args[2]: ":";
190 conf.set("ReportSeparator", reportSeparatorString);
191 Job job = new Job(conf, NAME + "_" + tableName);
192 job.setJarByClass(CellCounter.class);
193 Scan scan = getConfiguredScanForJob(conf, args);
194 TableMapReduceUtil.initTableMapperJob(tableName, scan,
195 CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
196 job.setNumReduceTasks(1);
197 job.setMapOutputKeyClass(Text.class);
198 job.setMapOutputValueClass(IntWritable.class);
199 job.setOutputFormatClass(TextOutputFormat.class);
200 job.setOutputKeyClass(Text.class);
201 job.setOutputValueClass(IntWritable.class);
202 FileOutputFormat.setOutputPath(job, outputDir);
203 job.setReducerClass(IntSumReducer.class);
204 return job;
205 }
206
207 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
208 Scan s = new Scan();
209
210 s.setMaxVersions(Integer.MAX_VALUE);
211 s.setCacheBlocks(false);
212
213 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
214 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
215 }
216
217 Filter rowFilter = getRowFilter(args);
218 if (rowFilter!= null) {
219 LOG.info("Setting Row Filter for counter.");
220 s.setFilter(rowFilter);
221 }
222
223 long timeRange[] = getTimeRange(args);
224 if (timeRange != null) {
225 LOG.info("Setting TimeRange for counter.");
226 s.setTimeRange(timeRange[0], timeRange[1]);
227 }
228 return s;
229 }
230
231
232 private static Filter getRowFilter(String[] args) {
233 Filter rowFilter = null;
234 String filterCriteria = (args.length > 3) ? args[3]: null;
235 if (filterCriteria == null) return null;
236 if (filterCriteria.startsWith("^")) {
237 String regexPattern = filterCriteria.substring(1, filterCriteria.length());
238 rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
239 } else {
240 rowFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
241 }
242 return rowFilter;
243 }
244
245 private static long[] getTimeRange(String[] args) throws IOException {
246 final String startTimeArgKey = "--starttime=";
247 final String endTimeArgKey = "--endtime=";
248 long startTime = 0L;
249 long endTime = 0L;
250
251 for (int i = 1; i < args.length; i++) {
252 System.out.println("i:" + i + "arg[i]" + args[i]);
253 if (args[i].startsWith(startTimeArgKey)) {
254 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
255 }
256 if (args[i].startsWith(endTimeArgKey)) {
257 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
258 }
259 }
260
261 if (startTime == 0 && endTime == 0)
262 return null;
263
264 endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
265 return new long [] {startTime, endTime};
266 }
267
268
269
270
271
272
273 public static void main(String[] args) throws Exception {
274 Configuration conf = HBaseConfiguration.create();
275 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
276 if (otherArgs.length < 2) {
277 System.err.println("ERROR: Wrong number of parameters: " + args.length);
278 System.err.println("Usage: CellCounter ");
279 System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
280 "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
281 System.err.println(" Note: -D properties will be applied to the conf used. ");
282 System.err.println(" Additionally, the following SCAN properties can be specified");
283 System.err.println(" to get fine grained control on what is counted..");
284 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
285 System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
286 "string : used to separate the rowId/column family name and qualifier name.");
287 System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
288 "operation to a limited subset of rows from the table based on regex or prefix pattern.");
289 System.exit(-1);
290 }
291 Job job = createSubmittableJob(conf, otherArgs);
292 System.exit(job.waitForCompletion(true) ? 0 : 1);
293 }
294 }