1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.CellUtil;
29 import org.apache.hadoop.hbase.Coprocessor;
30 import org.apache.hadoop.hbase.CoprocessorEnvironment;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
34 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
35 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.regionserver.InternalScanner;
38 import org.apache.hadoop.hbase.util.Bytes;
39
40 import com.google.protobuf.RpcCallback;
41 import com.google.protobuf.RpcController;
42 import com.google.protobuf.Service;
43
44
45
46
47
48 public class ColumnAggregationEndpoint extends ColumnAggregationService
49 implements Coprocessor, CoprocessorService {
50 static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
51 private RegionCoprocessorEnvironment env = null;
52
53 @Override
54 public Service getService() {
55 return this;
56 }
57
58 @Override
59 public void start(CoprocessorEnvironment env) throws IOException {
60 if (env instanceof RegionCoprocessorEnvironment) {
61 this.env = (RegionCoprocessorEnvironment)env;
62 return;
63 }
64 throw new CoprocessorException("Must be loaded on a table region!");
65 }
66
67 @Override
68 public void stop(CoprocessorEnvironment env) throws IOException {
69
70 }
71
72 @Override
73 public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
74
75 Scan scan = new Scan();
76
77 byte [] family = request.getFamily().toByteArray();
78 byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
79 if (request.hasQualifier()) {
80 scan.addColumn(family, qualifier);
81 } else {
82 scan.addFamily(family);
83 }
84 int sumResult = 0;
85 InternalScanner scanner = null;
86 try {
87 scanner = this.env.getRegion().getScanner(scan);
88 List<Cell> curVals = new ArrayList<Cell>();
89 boolean hasMore = false;
90 do {
91 curVals.clear();
92 hasMore = scanner.next(curVals);
93 for (Cell kv : curVals) {
94 if (CellUtil.matchingQualifier(kv, qualifier)) {
95 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
96 }
97 }
98 } while (hasMore);
99 } catch (IOException e) {
100 ResponseConverter.setControllerException(controller, e);
101
102 sumResult = -1;
103 LOG.info("Setting sum result to -1 to indicate error", e);
104 } finally {
105 if (scanner != null) {
106 try {
107 scanner.close();
108 } catch (IOException e) {
109 ResponseConverter.setControllerException(controller, e);
110 sumResult = -1;
111 LOG.info("Setting sum result to -1 to indicate error", e);
112 }
113 }
114 }
115 LOG.info("Returning result " + sumResult);
116 done.run(SumResponse.newBuilder().setSum(sumResult).build());
117 }
118 }