1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.KeyValueUtil;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.classification.InterfaceStability;
32 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
33 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.RequestConverter;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
39 import org.apache.hadoop.hbase.util.Bytes;
40
41 import com.google.common.annotations.VisibleForTesting;
42 import com.google.protobuf.ServiceException;
43
44
45
46
47
48
49
50
51 @InterfaceAudience.Public
52 @InterfaceStability.Evolving
53 public class ClientSmallScanner extends ClientScanner {
54 private final Log LOG = LogFactory.getLog(this.getClass());
55 private RegionServerCallable<Result[]> smallScanCallable = null;
56 private SmallScannerCallableFactory callableFactory;
57
58
59
60
61
62
63
64
65
66
67
68 public ClientSmallScanner(final Configuration conf, final Scan scan,
69 final TableName tableName) throws IOException {
70 this(conf, scan, tableName, HConnectionManager.getConnection(conf));
71 }
72
73
74
75
76
77
78
79
80
81
82
83 public ClientSmallScanner(final Configuration conf, final Scan scan,
84 final TableName tableName, HConnection connection) throws IOException {
85 this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf,
86 connection.getStatisticsTracker()),
87 RpcControllerFactory.instantiate(conf));
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
104 HConnection connection, RpcRetryingCallerFactory rpcFactory,
105 RpcControllerFactory controllerFactory) throws IOException {
106 this(conf, scan, tableName, connection, rpcFactory, controllerFactory, new SmallScannerCallableFactory());
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 @VisibleForTesting
131 ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
132 HConnection connection, RpcRetryingCallerFactory rpcFactory,
133 RpcControllerFactory controllerFactory, SmallScannerCallableFactory callableFactory)
134 throws IOException {
135 super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
136 this.callableFactory = callableFactory;
137 }
138
139 @Override
140 protected void initializeScannerInConstruction() throws IOException {
141
142
143 }
144
145
146
147
148
149
150
151
152
153
154 private boolean nextScanner(int nbRows, final boolean done,
155 boolean currentRegionDone) throws IOException {
156
157 byte[] localStartKey;
158 int cacheNum = nbRows;
159 boolean regionChanged = true;
160
161 if (this.currentRegion != null && currentRegionDone) {
162 byte[] endKey = this.currentRegion.getEndKey();
163 if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
164 || checkScanStopRow(endKey) || done) {
165 close();
166 if (LOG.isDebugEnabled()) {
167 LOG.debug("Finished with small scan at " + this.currentRegion);
168 }
169 return false;
170 }
171 localStartKey = endKey;
172 if (LOG.isDebugEnabled()) {
173 LOG.debug("Finished with region " + this.currentRegion);
174 }
175 } else if (this.lastResult != null) {
176 regionChanged = false;
177 localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
178 } else {
179 localStartKey = this.scan.getStartRow();
180 }
181
182 if (LOG.isTraceEnabled()) {
183 LOG.trace("Advancing internal small scanner to startKey at '"
184 + Bytes.toStringBinary(localStartKey) + "'");
185 }
186 smallScanCallable = callableFactory.getCallable(
187 scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory);
188 if (this.scanMetrics != null && regionChanged) {
189 this.scanMetrics.countOfRegions.incrementAndGet();
190 }
191 return true;
192 }
193
194 @Override
195 public Result next() throws IOException {
196
197
198 if (cache.size() == 0 && this.closed) {
199 return null;
200 }
201 if (cache.size() == 0) {
202 loadCache();
203 }
204
205 if (cache.size() > 0) {
206 return cache.poll();
207 }
208
209
210 writeScanMetrics();
211 return null;
212 }
213
214 @Override
215 protected void loadCache() throws IOException {
216 Result[] values = null;
217 long remainingResultSize = maxScannerResultSize;
218 int countdown = this.caching;
219 boolean currentRegionDone = false;
220
221 while (remainingResultSize > 0 && countdown > 0
222 && nextScanner(countdown, values == null, currentRegionDone)) {
223
224
225
226 values = this.caller.callWithRetries(smallScanCallable);
227 this.currentRegion = smallScanCallable.getHRegionInfo();
228 long currentTime = System.currentTimeMillis();
229 if (this.scanMetrics != null) {
230 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
231 - lastNext);
232 }
233 lastNext = currentTime;
234 if (values != null && values.length > 0) {
235 for (int i = 0; i < values.length; i++) {
236 Result rs = values[i];
237 cache.add(rs);
238 for (Cell kv : rs.rawCells()) {
239 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
240 }
241 countdown--;
242 this.lastResult = rs;
243 }
244 }
245 if (smallScanCallable.hasMoreResultsContext()) {
246
247 currentRegionDone = !smallScanCallable.getServerHasMoreResults();
248 } else {
249
250 currentRegionDone = countdown > 0;
251 }
252 }
253 }
254
255 public void close() {
256 if (!scanMetricsPublished) writeScanMetrics();
257 closed = true;
258 }
259
260 @VisibleForTesting
261 protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
262 this.callableFactory = callableFactory;
263 }
264
265 @VisibleForTesting
266 protected void setRpcRetryingCaller(RpcRetryingCaller<Result []> caller) {
267 this.caller = caller;
268 }
269
270 @VisibleForTesting
271 protected void setRpcControllerFactory(RpcControllerFactory rpcControllerFactory) {
272 this.rpcControllerFactory = rpcControllerFactory;
273 }
274
275 @InterfaceAudience.Private
276 protected static class SmallScannerCallableFactory {
277
278 public RegionServerCallable<Result[]> getCallable(final Scan sc, HConnection connection,
279 TableName table, byte[] localStartKey, final int cacheNum,
280 final RpcControllerFactory rpcControllerFactory) throws IOException {
281 sc.setStartRow(localStartKey);
282 RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
283 connection, table, sc.getStartRow()) {
284 public Result[] call() throws IOException {
285 ScanRequest request = RequestConverter.buildScanRequest(getLocation()
286 .getRegionInfo().getRegionName(), sc, cacheNum, true);
287 ScanResponse response = null;
288 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
289 try {
290 controller.setPriority(getTableName());
291 response = getStub().scan(controller, request);
292 if (response.hasMoreResultsInRegion()) {
293 setHasMoreResultsContext(true);
294 setServerHasMoreResults(response.getMoreResultsInRegion());
295 } else {
296 setHasMoreResultsContext(false);
297 }
298 return ResponseConverter.getResults(controller.cellScanner(),
299 response);
300 } catch (ServiceException se) {
301 throw ProtobufUtil.getRemoteException(se);
302 }
303 }
304 };
305 return callable;
306 }
307
308 }
309 }