View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.net.UnknownHostException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.RemoteExceptionHandler;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.UnknownScannerException;
38  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
40  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
41  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
42  import org.apache.hadoop.hbase.protobuf.RequestConverter;
43  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
44  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
45  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
46  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
47  import org.apache.hadoop.ipc.RemoteException;
48  import org.apache.hadoop.net.DNS;
49  
50  import com.google.protobuf.ServiceException;
51  import com.google.protobuf.TextFormat;
52  
53  /**
54   * Scanner operations such as create, next, etc.
55   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
56   * {@link RpcRetryingCaller} so fails are retried.
57   */
58  @InterfaceAudience.Private
59  public class ScannerCallable extends RegionServerCallable<Result[]> {
60    public static final String LOG_SCANNER_LATENCY_CUTOFF
61      = "hbase.client.log.scanner.latency.cutoff";
62    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
63  
64    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
65    private long scannerId = -1L;
66    protected boolean instantiated = false;
67    private boolean closed = false;
68    private Scan scan;
69    private int caching = 1;
70    protected ScanMetrics scanMetrics;
71    private boolean logScannerActivity = false;
72    private int logCutOffLatency = 1000;
73    private static String myAddress;
74    static {
75      try {
76        myAddress = DNS.getDefaultHost("default", "default");
77      } catch (UnknownHostException uhe) {
78        LOG.error("cannot determine my address", uhe);
79      }
80    }
81  
82    // indicate if it is a remote server call
83    protected boolean isRegionServerRemote = true;
84    private long nextCallSeq = 0;
85    protected final PayloadCarryingRpcController controller;
86    
87    /**
88     * @param connection which connection
89     * @param tableName table callable is on
90     * @param scan the scan to execute
91     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
92     *          metrics
93     * @param controller to use when writing the rpc
94     */
95    public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
96        ScanMetrics scanMetrics, PayloadCarryingRpcController controller) {
97      super(connection, tableName, scan.getStartRow());
98      this.scan = scan;
99      this.scanMetrics = scanMetrics;
100     Configuration conf = connection.getConfiguration();
101     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
102     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
103     this.controller = controller;
104   }
105 
106   /**
107    * @deprecated Use {@link #ScannerCallable(HConnection, TableName, Scan, 
108    *  ScanMetrics, PayloadCarryingRpcController)}
109    */
110   @Deprecated
111   public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
112       ScanMetrics scanMetrics) {
113     this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
114         .instantiate(connection.getConfiguration()).newController());
115   }
116 
117   /**
118    * @param reload force reload of server location
119    * @throws IOException
120    */
121   @Override
122   public void prepare(boolean reload) throws IOException {
123     if (!instantiated || reload) {
124       super.prepare(reload);
125       checkIfRegionServerIsRemote();
126       instantiated = true;
127     }
128 
129     // check how often we retry.
130     // HConnectionManager will call instantiateServer with reload==true
131     // if and only if for retries.
132     if (reload && this.scanMetrics != null) {
133       this.scanMetrics.countOfRPCRetries.incrementAndGet();
134       if (isRegionServerRemote) {
135         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
136       }
137     }
138   }
139 
140   /**
141    * compare the local machine hostname with region server's hostname
142    * to decide if hbase client connects to a remote region server
143    */
144   protected void checkIfRegionServerIsRemote() {
145     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
146       isRegionServerRemote = false;
147     } else {
148       isRegionServerRemote = true;
149     }
150   }
151 
152   /**
153    * @see java.util.concurrent.Callable#call()
154    */
155   @SuppressWarnings("deprecation")
156   public Result [] call() throws IOException {
157     if (closed) {
158       if (scannerId != -1) {
159         close();
160       }
161     } else {
162       if (scannerId == -1L) {
163         this.scannerId = openScanner();
164       } else {
165         Result [] rrs = null;
166         ScanRequest request = null;
167         try {
168           incRPCcallsMetrics();
169           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
170           ScanResponse response = null;
171           try {
172             controller.setPriority(getTableName());
173             response = getStub().scan(controller, request);
174             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
175             // from client to server will increment this number in both sides. Client passes this
176             // number along with the request and at RS side both the incoming nextCallSeq and its
177             // nextCallSeq will be matched. In case of a timeout this increment at the client side
178             // should not happen. If at the server side fetching of next batch of data was over,
179             // there will be mismatch in the nextCallSeq number. Server will throw
180             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
181             // as the last successfully retrieved row.
182             // See HBASE-5974
183             nextCallSeq++;
184             long timestamp = System.currentTimeMillis();
185             // Results are returned via controller
186             CellScanner cellScanner = controller.cellScanner();
187             rrs = ResponseConverter.getResults(cellScanner, response);
188             if (logScannerActivity) {
189               long now = System.currentTimeMillis();
190               if (now - timestamp > logCutOffLatency) {
191                 int rows = rrs == null ? 0 : rrs.length;
192                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
193                   + rows + " rows from scanner=" + scannerId);
194               }
195             }
196             // moreResults is only used for the case where a filter exhausts all elements
197             if (response.hasMoreResults() && !response.getMoreResults()) {
198               scannerId = -1L;
199               closed = true;
200               // Implied that no results were returned back, either.
201               return null;
202             }
203             // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
204             // to size or quantity of results in the response.
205             if (response.hasMoreResultsInRegion()) {
206               // Set what the RS said
207               setHasMoreResultsContext(true);
208               setServerHasMoreResults(response.getMoreResultsInRegion());
209             } else {
210               // Server didn't respond whether it has more results or not.
211               setHasMoreResultsContext(false);
212             }
213           } catch (ServiceException se) {
214             throw ProtobufUtil.getRemoteException(se);
215           }
216           updateResultsMetrics(rrs);
217         } catch (IOException e) {
218           if (logScannerActivity) {
219             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
220               + " to " + getLocation(), e);
221           }
222           IOException ioe = e;
223           if (e instanceof RemoteException) {
224             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
225           }
226           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
227             try {
228               HRegionLocation location =
229                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
230               LOG.info("Scanner=" + scannerId
231                 + " expired, current region location is " + location.toString());
232             } catch (Throwable t) {
233               LOG.info("Failed to relocate region", t);
234             }
235           }
236           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
237           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
238           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
239           // a scan when doing a next in particular, we want to break out and get the scanner to
240           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
241           // yeah and hard to follow and in need of a refactor).
242           if (ioe instanceof NotServingRegionException) {
243             // Throw a DNRE so that we break out of cycle of calling NSRE
244             // when what we need is to open scanner against new location.
245             // Attach NSRE to signal client that it needs to re-setup scanner.
246             if (this.scanMetrics != null) {
247               this.scanMetrics.countOfNSRE.incrementAndGet();
248             }
249             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
250           } else if (ioe instanceof RegionServerStoppedException) {
251             // Throw a DNRE so that we break out of cycle of the retries and instead go and
252             // open scanner against new location.
253             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
254           } else {
255             // The outer layers will retry
256             throw ioe;
257           }
258         }
259         return rrs;
260       }
261     }
262     return null;
263   }
264 
265   private void incRPCcallsMetrics() {
266     if (this.scanMetrics == null) {
267       return;
268     }
269     this.scanMetrics.countOfRPCcalls.incrementAndGet();
270     if (isRegionServerRemote) {
271       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
272     }
273   }
274 
275   private void updateResultsMetrics(Result[] rrs) {
276     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
277       return;
278     }
279     long resultSize = 0;
280     for (Result rr : rrs) {
281       for (Cell kv : rr.rawCells()) {
282         // TODO add getLength to Cell/use CellUtil#estimatedSizeOf
283         resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
284       }
285     }
286     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
287     if (isRegionServerRemote) {
288       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
289     }
290   }
291 
292   private void close() {
293     if (this.scannerId == -1L) {
294       return;
295     }
296     try {
297       incRPCcallsMetrics();
298       ScanRequest request =
299         RequestConverter.buildScanRequest(this.scannerId, 0, true);
300       try {
301         getStub().scan(null, request);
302       } catch (ServiceException se) {
303         throw ProtobufUtil.getRemoteException(se);
304       }
305     } catch (IOException e) {
306       LOG.warn("Ignore, probably already closed", e);
307     }
308     this.scannerId = -1L;
309   }
310 
311   protected long openScanner() throws IOException {
312     incRPCcallsMetrics();
313     ScanRequest request =
314       RequestConverter.buildScanRequest(
315         getLocation().getRegionInfo().getRegionName(),
316         this.scan, 0, false);
317     try {
318       ScanResponse response = getStub().scan(null, request);
319       long id = response.getScannerId();
320       if (logScannerActivity) {
321         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
322           + " on region " + getLocation().toString());
323       }
324       return id;
325     } catch (ServiceException se) {
326       throw ProtobufUtil.getRemoteException(se);
327     }
328   }
329 
330   protected Scan getScan() {
331     return scan;
332   }
333 
334   /**
335    * Call this when the next invocation of call should close the scanner
336    */
337   public void setClose() {
338     this.closed = true;
339   }
340 
341   /**
342    * @return the HRegionInfo for the current region
343    */
344   public HRegionInfo getHRegionInfo() {
345     if (!instantiated) {
346       return null;
347     }
348     return getLocation().getRegionInfo();
349   }
350 
351   /**
352    * Get the number of rows that will be fetched on next
353    * @return the number of rows for caching
354    */
355   public int getCaching() {
356     return caching;
357   }
358 
359   /**
360    * Set the number of rows that will be fetched on next
361    * @param caching the number of rows for caching
362    */
363   public void setCaching(int caching) {
364     this.caching = caching;
365   }
366 }