1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.mapred.JobConf;
33 import org.apache.hadoop.mapred.MapReduceBase;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36
37
38
39
40
41 @Deprecated
42 @InterfaceAudience.Public
43 @InterfaceStability.Stable
44 public class GroupingTableMap
45 extends MapReduceBase
46 implements TableMap<ImmutableBytesWritable,Result> {
47
48
49
50
51
52 public static final String GROUP_COLUMNS =
53 "hbase.mapred.groupingtablemap.columns";
54
55 protected byte [][] columns;
56
57
58
59
60
61
62
63
64
65
66
67
68 @SuppressWarnings("unchecked")
69 public static void initJob(String table, String columns, String groupColumns,
70 Class<? extends TableMap> mapper, JobConf job) {
71
72 TableMapReduceUtil.initTableMapJob(table, columns, mapper,
73 ImmutableBytesWritable.class, Result.class, job);
74 job.set(GROUP_COLUMNS, groupColumns);
75 }
76
77 @Override
78 public void configure(JobConf job) {
79 super.configure(job);
80 String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
81 columns = new byte[cols.length][];
82 for(int i = 0; i < cols.length; i++) {
83 columns[i] = Bytes.toBytes(cols[i]);
84 }
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98 public void map(ImmutableBytesWritable key, Result value,
99 OutputCollector<ImmutableBytesWritable,Result> output,
100 Reporter reporter) throws IOException {
101
102 byte[][] keyVals = extractKeyValues(value);
103 if(keyVals != null) {
104 ImmutableBytesWritable tKey = createGroupKey(keyVals);
105 output.collect(tKey, value);
106 }
107 }
108
109
110
111
112
113
114
115
116
117
118 protected byte[][] extractKeyValues(Result r) {
119 byte[][] keyVals = null;
120 ArrayList<byte[]> foundList = new ArrayList<byte[]>();
121 int numCols = columns.length;
122 if (numCols > 0) {
123 for (Cell value: r.listCells()) {
124 byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value),
125 CellUtil.cloneQualifier(value));
126 for (int i = 0; i < numCols; i++) {
127 if (Bytes.equals(column, columns[i])) {
128 foundList.add(CellUtil.cloneValue(value));
129 break;
130 }
131 }
132 }
133 if(foundList.size() == numCols) {
134 keyVals = foundList.toArray(new byte[numCols][]);
135 }
136 }
137 return keyVals;
138 }
139
140
141
142
143
144
145
146
147 protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
148 if(vals == null) {
149 return null;
150 }
151 StringBuilder sb = new StringBuilder();
152 for(int i = 0; i < vals.length; i++) {
153 if(i > 0) {
154 sb.append(" ");
155 }
156 sb.append(Bytes.toString(vals[i]));
157 }
158 return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
159 }
160 }