1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.net.UnknownHostException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellScanner;
30 import org.apache.hadoop.hbase.DoNotRetryIOException;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HRegionLocation;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.NotServingRegionException;
35 import org.apache.hadoop.hbase.RemoteExceptionHandler;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.UnknownScannerException;
38 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
40 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
41 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
42 import org.apache.hadoop.hbase.protobuf.RequestConverter;
43 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
44 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
45 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
46 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
47 import org.apache.hadoop.ipc.RemoteException;
48 import org.apache.hadoop.net.DNS;
49
50 import com.google.protobuf.ServiceException;
51 import com.google.protobuf.TextFormat;
52
53
54
55
56
57
58 @InterfaceAudience.Private
59 public class ScannerCallable extends RegionServerCallable<Result[]> {
60 public static final String LOG_SCANNER_LATENCY_CUTOFF
61 = "hbase.client.log.scanner.latency.cutoff";
62 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
63
64 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
65 private long scannerId = -1L;
66 protected boolean instantiated = false;
67 private boolean closed = false;
68 private Scan scan;
69 private int caching = 1;
70 protected ScanMetrics scanMetrics;
71 private boolean logScannerActivity = false;
72 private int logCutOffLatency = 1000;
73 private static String myAddress;
74 static {
75 try {
76 myAddress = DNS.getDefaultHost("default", "default");
77 } catch (UnknownHostException uhe) {
78 LOG.error("cannot determine my address", uhe);
79 }
80 }
81
82
83 protected boolean isRegionServerRemote = true;
84 private long nextCallSeq = 0;
85 protected final PayloadCarryingRpcController controller;
86
87
88
89
90
91
92
93
94
95 public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
96 ScanMetrics scanMetrics, PayloadCarryingRpcController controller) {
97 super(connection, tableName, scan.getStartRow());
98 this.scan = scan;
99 this.scanMetrics = scanMetrics;
100 Configuration conf = connection.getConfiguration();
101 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
102 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
103 this.controller = controller;
104 }
105
106
107
108
109
110 @Deprecated
111 public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
112 ScanMetrics scanMetrics) {
113 this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
114 .instantiate(connection.getConfiguration()).newController());
115 }
116
117
118
119
120
121 @Override
122 public void prepare(boolean reload) throws IOException {
123 if (!instantiated || reload) {
124 super.prepare(reload);
125 checkIfRegionServerIsRemote();
126 instantiated = true;
127 }
128
129
130
131
132 if (reload && this.scanMetrics != null) {
133 this.scanMetrics.countOfRPCRetries.incrementAndGet();
134 if (isRegionServerRemote) {
135 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
136 }
137 }
138 }
139
140
141
142
143
144 protected void checkIfRegionServerIsRemote() {
145 if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
146 isRegionServerRemote = false;
147 } else {
148 isRegionServerRemote = true;
149 }
150 }
151
152
153
154
155 @SuppressWarnings("deprecation")
156 public Result [] call() throws IOException {
157 if (closed) {
158 if (scannerId != -1) {
159 close();
160 }
161 } else {
162 if (scannerId == -1L) {
163 this.scannerId = openScanner();
164 } else {
165 Result [] rrs = null;
166 ScanRequest request = null;
167 try {
168 incRPCcallsMetrics();
169 request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
170 ScanResponse response = null;
171 try {
172 controller.setPriority(getTableName());
173 response = getStub().scan(controller, request);
174
175
176
177
178
179
180
181
182
183 nextCallSeq++;
184 long timestamp = System.currentTimeMillis();
185
186 CellScanner cellScanner = controller.cellScanner();
187 rrs = ResponseConverter.getResults(cellScanner, response);
188 if (logScannerActivity) {
189 long now = System.currentTimeMillis();
190 if (now - timestamp > logCutOffLatency) {
191 int rows = rrs == null ? 0 : rrs.length;
192 LOG.info("Took " + (now-timestamp) + "ms to fetch "
193 + rows + " rows from scanner=" + scannerId);
194 }
195 }
196
197 if (response.hasMoreResults() && !response.getMoreResults()) {
198 scannerId = -1L;
199 closed = true;
200
201 return null;
202 }
203
204
205 if (response.hasMoreResultsInRegion()) {
206
207 setHasMoreResultsContext(true);
208 setServerHasMoreResults(response.getMoreResultsInRegion());
209 } else {
210
211 setHasMoreResultsContext(false);
212 }
213 } catch (ServiceException se) {
214 throw ProtobufUtil.getRemoteException(se);
215 }
216 updateResultsMetrics(rrs);
217 } catch (IOException e) {
218 if (logScannerActivity) {
219 LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
220 + " to " + getLocation(), e);
221 }
222 IOException ioe = e;
223 if (e instanceof RemoteException) {
224 ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
225 }
226 if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
227 try {
228 HRegionLocation location =
229 getConnection().relocateRegion(getTableName(), scan.getStartRow());
230 LOG.info("Scanner=" + scannerId
231 + " expired, current region location is " + location.toString());
232 } catch (Throwable t) {
233 LOG.info("Failed to relocate region", t);
234 }
235 }
236
237
238
239
240
241
242 if (ioe instanceof NotServingRegionException) {
243
244
245
246 if (this.scanMetrics != null) {
247 this.scanMetrics.countOfNSRE.incrementAndGet();
248 }
249 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
250 } else if (ioe instanceof RegionServerStoppedException) {
251
252
253 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
254 } else {
255
256 throw ioe;
257 }
258 }
259 return rrs;
260 }
261 }
262 return null;
263 }
264
265 private void incRPCcallsMetrics() {
266 if (this.scanMetrics == null) {
267 return;
268 }
269 this.scanMetrics.countOfRPCcalls.incrementAndGet();
270 if (isRegionServerRemote) {
271 this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
272 }
273 }
274
275 private void updateResultsMetrics(Result[] rrs) {
276 if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
277 return;
278 }
279 long resultSize = 0;
280 for (Result rr : rrs) {
281 for (Cell kv : rr.rawCells()) {
282
283 resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
284 }
285 }
286 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
287 if (isRegionServerRemote) {
288 this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
289 }
290 }
291
292 private void close() {
293 if (this.scannerId == -1L) {
294 return;
295 }
296 try {
297 incRPCcallsMetrics();
298 ScanRequest request =
299 RequestConverter.buildScanRequest(this.scannerId, 0, true);
300 try {
301 getStub().scan(null, request);
302 } catch (ServiceException se) {
303 throw ProtobufUtil.getRemoteException(se);
304 }
305 } catch (IOException e) {
306 LOG.warn("Ignore, probably already closed", e);
307 }
308 this.scannerId = -1L;
309 }
310
311 protected long openScanner() throws IOException {
312 incRPCcallsMetrics();
313 ScanRequest request =
314 RequestConverter.buildScanRequest(
315 getLocation().getRegionInfo().getRegionName(),
316 this.scan, 0, false);
317 try {
318 ScanResponse response = getStub().scan(null, request);
319 long id = response.getScannerId();
320 if (logScannerActivity) {
321 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
322 + " on region " + getLocation().toString());
323 }
324 return id;
325 } catch (ServiceException se) {
326 throw ProtobufUtil.getRemoteException(se);
327 }
328 }
329
330 protected Scan getScan() {
331 return scan;
332 }
333
334
335
336
337 public void setClose() {
338 this.closed = true;
339 }
340
341
342
343
344 public HRegionInfo getHRegionInfo() {
345 if (!instantiated) {
346 return null;
347 }
348 return getLocation().getRegionInfo();
349 }
350
351
352
353
354
355 public int getCaching() {
356 return caching;
357 }
358
359
360
361
362
363 public void setCaching(int caching) {
364 this.caching = caching;
365 }
366 }