View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Collections;
48  import java.util.HashMap;
49  import java.util.Iterator;
50  import java.util.LinkedList;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Random;
54  import java.util.concurrent.ExecutorService;
55  import java.util.concurrent.Executors;
56  import java.util.concurrent.atomic.AtomicInteger;
57  
58  import javax.security.sasl.Sasl;
59  import javax.security.sasl.SaslException;
60  import javax.security.sasl.SaslServer;
61  
62  import org.apache.commons.logging.Log;
63  import org.apache.commons.logging.LogFactory;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.classification.InterfaceStability;
66  import org.apache.hadoop.conf.Configuration;
67  import org.apache.hadoop.hbase.CellScanner;
68  import org.apache.hadoop.hbase.DoNotRetryIOException;
69  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
70  import org.apache.hadoop.hbase.HConstants;
71  import org.apache.hadoop.hbase.HRegionInfo;
72  import org.apache.hadoop.hbase.Server;
73  import org.apache.hadoop.hbase.TableName;
74  import org.apache.hadoop.hbase.client.Operation;
75  import org.apache.hadoop.hbase.codec.Codec;
76  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
77  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
78  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
79  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
82  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
83  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
84  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
85  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
86  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
87  import org.apache.hadoop.hbase.regionserver.HRegionServer;
88  import org.apache.hadoop.hbase.security.AccessDeniedException;
89  import org.apache.hadoop.hbase.security.AuthMethod;
90  import org.apache.hadoop.hbase.security.HBasePolicyProvider;
91  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
92  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
93  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
94  import org.apache.hadoop.hbase.security.SaslStatus;
95  import org.apache.hadoop.hbase.security.SaslUtil;
96  import org.apache.hadoop.hbase.security.User;
97  import org.apache.hadoop.hbase.security.UserProvider;
98  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
99  import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.io.BytesWritable;
102 import org.apache.hadoop.io.IntWritable;
103 import org.apache.hadoop.io.Writable;
104 import org.apache.hadoop.io.WritableUtils;
105 import org.apache.hadoop.io.compress.CompressionCodec;
106 import org.apache.hadoop.security.UserGroupInformation;
107 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
108 import org.apache.hadoop.security.authorize.AuthorizationException;
109 import org.apache.hadoop.security.authorize.PolicyProvider;
110 import org.apache.hadoop.security.authorize.ProxyUsers;
111 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
112 import org.apache.hadoop.security.token.SecretManager;
113 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
114 import org.apache.hadoop.security.token.TokenIdentifier;
115 import org.apache.hadoop.util.StringUtils;
116 import org.cliffc.high_scale_lib.Counter;
117 import org.cloudera.htrace.TraceInfo;
118 import org.codehaus.jackson.map.ObjectMapper;
119 
120 import com.google.common.util.concurrent.ThreadFactoryBuilder;
121 import com.google.protobuf.BlockingService;
122 import com.google.protobuf.CodedInputStream;
123 import com.google.protobuf.Descriptors.MethodDescriptor;
124 import com.google.protobuf.Message;
125 import com.google.protobuf.Message.Builder;
126 import com.google.protobuf.ServiceException;
127 import com.google.protobuf.TextFormat;
128 // Uses Writables doing sasl
129 
130 /**
131  * An RPC server that hosts protobuf described Services.
132  *
133  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
134  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
135  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
136  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
137  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
138  * and loops till done.
139  *
140  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
141  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
142  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
143  * and keep taking while the server is up.
144  *
145  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
146  * queue for {@link Responder} to pull from and return result to client.
147  *
148  * @see RpcClient
149  */
150 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
151 @InterfaceStability.Evolving
152 public class RpcServer implements RpcServerInterface {
153   // The logging package is deliberately outside of standard o.a.h.h package so it is not on
154   // by default.
155   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcServer");
156 
157   private final boolean authorize;
158   private boolean isSecurityEnabled;
159 
160   public static final byte CURRENT_VERSION = 0;
161 
162   /**
163    * How many calls/handler are allowed in the queue.
164    */
165   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
166 
167   /**
168    * The maximum size that we can hold in the RPC queue
169    */
170   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
171 
172   static final int BUFFER_INITIAL_SIZE = 1024;
173 
174   private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
175 
176   private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
177 
178   private final int warnDelayedCalls;
179 
180   private AtomicInteger delayedCalls;
181   private final IPCUtil ipcUtil;
182 
183   private static final String AUTH_FAILED_FOR = "Auth failed for ";
184   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
185   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
186     Server.class.getName());
187   protected SecretManager<TokenIdentifier> secretManager;
188   protected ServiceAuthorizationManager authManager;
189 
190   /** This is set to Call object before Handler invokes an RPC and ybdie
191    * after the call returns.
192    */
193   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
194 
195   /** Keeps MonitoredRPCHandler per handler thread. */
196   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
197       = new ThreadLocal<MonitoredRPCHandler>();
198 
199   protected final InetSocketAddress isa;
200   protected int port;                             // port we listen on
201   private int readThreads;                        // number of read threads
202   protected int maxIdleTime;                      // the maximum idle time after
203                                                   // which a client may be
204                                                   // disconnected
205   protected int thresholdIdleConnections;         // the number of idle
206                                                   // connections after which we
207                                                   // will start cleaning up idle
208                                                   // connections
209   int maxConnectionsToNuke;                       // the max number of
210                                                   // connections to nuke
211                                                   // during a cleanup
212 
213   protected MetricsHBaseServer metrics;
214 
215   protected final Configuration conf;
216 
217   private int maxQueueSize;
218   protected int socketSendBufferSize;
219   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
220   protected final boolean tcpKeepAlive; // if T then use keepalives
221   protected final long purgeTimeout;    // in milliseconds
222 
223   /**
224    * This flag is used to indicate to sub threads when they should go down.  When we call
225    * {@link #startThreads()}, all threads started will consult this flag on whether they should
226    * keep going.  It is set to false when {@link #stop()} is called.
227    */
228   volatile boolean running = true;
229 
230   /**
231    * This flag is set to true after all threads are up and 'running' and the server is then opened
232    * for business by the calle to {@link #openServer()}.
233    */
234   volatile boolean started = false;
235 
236   /**
237    * This is a running count of the size of all outstanding calls by size.
238    */
239   protected final Counter callQueueSize = new Counter();
240 
241   protected final List<Connection> connectionList =
242     Collections.synchronizedList(new LinkedList<Connection>());
243   //maintain a list
244   //of client connections
245   private Listener listener = null;
246   protected Responder responder = null;
247   protected int numConnections = 0;
248 
249   protected HBaseRPCErrorHandler errorHandler = null;
250 
251   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
252   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
253 
254   /** Default value for above params */
255   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
256   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
257 
258   private static final ObjectMapper MAPPER = new ObjectMapper();
259 
260   private final int warnResponseTime;
261   private final int warnResponseSize;
262   private final Object serverInstance;
263   private final List<BlockingServiceAndInterface> services;
264 
265   private final RpcScheduler scheduler;
266 
267   private UserProvider userProvider;
268 
269   private final BoundedByteBufferPool reservoir;
270 
271 
272   /**
273    * Datastructure that holds all necessary to a method invocation and then afterward, carries
274    * the result.
275    */
276   class Call implements RpcCallContext {
277     protected int id;                             // the client's call id
278     protected BlockingService service;
279     protected MethodDescriptor md;
280     protected RequestHeader header;
281     protected Message param;                      // the parameter passed
282     // Optional cell data passed outside of protobufs.
283     protected CellScanner cellScanner;
284     protected Connection connection;              // connection to client
285     protected long timestamp;      // the time received when response is null
286                                    // the time served when response is not null
287     /**
288      * Chain of buffers to send as response.
289      */
290     protected BufferChain response;
291     protected boolean delayResponse;
292     protected Responder responder;
293     protected boolean delayReturnValue;           // if the return value should be
294                                                   // set at call completion
295     protected long size;                          // size of current call
296     protected boolean isError;
297     protected TraceInfo tinfo;
298     private ByteBuffer cellBlock = null;
299 
300     private User user;
301     private InetAddress remoteAddress;
302 
303     /**
304      * Deprecated, do not use
305      */
306     @Deprecated
307     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
308       Message param, CellScanner cellScanner, Connection connection, Responder responder,
309       long size, TraceInfo tinfo) {
310       this(id, service, md, header, param, cellScanner, connection, responder, size, tinfo, null);
311     }
312 
313     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
314          Message param, CellScanner cellScanner, Connection connection, Responder responder,
315          long size, TraceInfo tinfo, InetAddress remoteAddress) {
316       this.id = id;
317       this.service = service;
318       this.md = md;
319       this.header = header;
320       this.param = param;
321       this.cellScanner = cellScanner;
322       this.connection = connection;
323       this.timestamp = System.currentTimeMillis();
324       this.response = null;
325       this.delayResponse = false;
326       this.responder = responder;
327       this.isError = false;
328       this.size = size;
329       this.tinfo = tinfo;
330       if (connection != null && connection.user != null) {
331         this.user = userProvider.create(connection.user);
332       } else {
333         this.user = null;
334       }
335       this.remoteAddress = remoteAddress;
336     }
337 
338     /**
339      * Call is done. Execution happened and we returned results to client. It is now safe to
340      * cleanup.
341      */
342     void done() {
343       if (this.cellBlock != null) {
344         // Return buffer to reservoir now we are done with it.
345         reservoir.putBuffer(this.cellBlock);
346         this.cellBlock = null;
347       }
348     }
349 
350     @Override
351     public String toString() {
352       return toShortString() + " param: " +
353         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
354         " connection: " + connection.toString();
355     }
356 
357     protected RequestHeader getHeader() {
358       return this.header;
359     }
360 
361     @Override
362     public User getRequestUser() {
363       return user;
364     }
365 
366     @Override
367     public String getRequestUserName() {
368       User user = getRequestUser();
369       return user == null? null: user.getShortName();
370     }
371 
372     @Override
373     public InetAddress getRemoteAddress() {
374       return remoteAddress;
375     }
376 
377     /*
378      * Short string representation without param info because param itself could be huge depends on
379      * the payload of a command
380      */
381     String toShortString() {
382       String serviceName = this.connection.service != null?
383         this.connection.service.getDescriptorForType().getName() : "null";
384       StringBuilder sb = new StringBuilder();
385       sb.append("callId: ");
386       sb.append(this.id);
387       sb.append(" service: ");
388       sb.append(serviceName);
389       sb.append(" methodName: ");
390       sb.append((this.md != null) ? this.md.getName() : "");
391       sb.append(" size: ");
392       sb.append(StringUtils.humanReadableInt(this.size));
393       sb.append(" connection: ");
394       sb.append(connection.toString());
395       return sb.toString();
396     }
397 
398     String toTraceString() {
399       String serviceName = this.connection.service != null ?
400                            this.connection.service.getDescriptorForType().getName() : "";
401       String methodName = (this.md != null) ? this.md.getName() : "";
402       String result = serviceName + "." + methodName;
403       return result;
404     }
405 
406     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
407       this.response = new BufferChain(response);
408     }
409 
410     protected synchronized void setResponse(Object m, final CellScanner cells,
411         Throwable t, String errorMsg) {
412       if (this.isError) return;
413       if (t != null) this.isError = true;
414       BufferChain bc = null;
415       try {
416         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
417         // Presume it a pb Message.  Could be null.
418         Message result = (Message)m;
419         // Call id.
420         headerBuilder.setCallId(this.id);
421         if (t != null) {
422           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
423           exceptionBuilder.setExceptionClassName(t.getClass().getName());
424           exceptionBuilder.setStackTrace(errorMsg);
425           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
426           if (t instanceof RegionMovedException) {
427             // Special casing for this exception.  This is only one carrying a payload.
428             // Do this instead of build a generic system for allowing exceptions carry
429             // any kind of payload.
430             RegionMovedException rme = (RegionMovedException)t;
431             exceptionBuilder.setHostname(rme.getHostname());
432             exceptionBuilder.setPort(rme.getPort());
433           }
434           // Set the exception as the result of the method invocation.
435           headerBuilder.setException(exceptionBuilder.build());
436         }
437         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
438         // reservoir when finished. This is hacky and the hack is not contained but benefits are
439         // high when we can avoid a big buffer allocation on each rpc.
440         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
441           this.connection.compressionCodec, cells, reservoir);
442         if (this.cellBlock != null) {
443           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
444           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
445           cellBlockBuilder.setLength(this.cellBlock.limit());
446           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
447         }
448         Message header = headerBuilder.build();
449 
450         // Organize the response as a set of bytebuffers rather than collect it all together inside
451         // one big byte array; save on allocations.
452         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
453         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
454         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
455           (this.cellBlock == null? 0: this.cellBlock.limit());
456         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
457         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
458         if (connection.useWrap) {
459           bc = wrapWithSasl(bc);
460         }
461       } catch (IOException e) {
462         LOG.warn("Exception while creating response " + e);
463       }
464       this.response = bc;
465     }
466 
467     private BufferChain wrapWithSasl(BufferChain bc)
468         throws IOException {
469       if (bc == null) return bc;
470       if (!this.connection.useSasl) return bc;
471       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
472       // THIS IS A BIG UGLY COPY.
473       byte [] responseBytes = bc.getBytes();
474       byte [] token;
475       // synchronization may be needed since there can be multiple Handler
476       // threads using saslServer to wrap responses.
477       synchronized (connection.saslServer) {
478         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
479       }
480       if (LOG.isDebugEnabled())
481         LOG.debug("Adding saslServer wrapped token of size " + token.length
482             + " as call response.");
483 
484       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
485       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
486       return new BufferChain(bbTokenLength, bbTokenBytes);
487     }
488 
489     @Override
490     public synchronized void endDelay(Object result) throws IOException {
491       assert this.delayResponse;
492       assert this.delayReturnValue || result == null;
493       this.delayResponse = false;
494       delayedCalls.decrementAndGet();
495       if (this.delayReturnValue) {
496         this.setResponse(result, null, null, null);
497       }
498       this.responder.doRespond(this);
499     }
500 
501     @Override
502     public synchronized void endDelay() throws IOException {
503       this.endDelay(null);
504     }
505 
506     @Override
507     public synchronized void startDelay(boolean delayReturnValue) {
508       assert !this.delayResponse;
509       this.delayResponse = true;
510       this.delayReturnValue = delayReturnValue;
511       int numDelayed = delayedCalls.incrementAndGet();
512       if (numDelayed > warnDelayedCalls) {
513         LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
514       }
515     }
516 
517     @Override
518     public synchronized void endDelayThrowing(Throwable t) throws IOException {
519       this.setResponse(null, null, t, StringUtils.stringifyException(t));
520       this.delayResponse = false;
521       this.sendResponseIfReady();
522     }
523 
524     @Override
525     public synchronized boolean isDelayed() {
526       return this.delayResponse;
527     }
528 
529     @Override
530     public synchronized boolean isReturnValueDelayed() {
531       return this.delayReturnValue;
532     }
533 
534     @Override
535     public boolean isClientCellBlockSupport() {
536       return this.connection != null && this.connection.codec != null;
537     }
538 
539     @Override
540     public long disconnectSince() {
541       if (!connection.channel.isOpen()) {
542         return System.currentTimeMillis() - timestamp;
543       } else {
544         return -1L;
545       }
546     }
547 
548     public long getSize() {
549       return this.size;
550     }
551 
552     /**
553      * If we have a response, and delay is not set, then respond
554      * immediately.  Otherwise, do not respond to client.  This is
555      * called by the RPC code in the context of the Handler thread.
556      */
557     public synchronized void sendResponseIfReady() throws IOException {
558       if (!this.delayResponse) {
559         this.responder.doRespond(this);
560       }
561     }
562   }
563 
564   /** Listens on the socket. Creates jobs for the handler threads*/
565   private class Listener extends Thread {
566 
567     private ServerSocketChannel acceptChannel = null; //the accept channel
568     private Selector selector = null; //the selector that we use for the server
569     private Reader[] readers = null;
570     private int currentReader = 0;
571     private Random rand = new Random();
572     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
573                                          //-tion (for idle connections) ran
574     private long cleanupInterval = 10000; //the minimum interval between
575                                           //two cleanup runs
576     private int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size",
577       conf.getInt("ipc.server.listen.queue.size", 128));
578 
579     private ExecutorService readPool;
580 
581     public Listener(final String name) throws IOException {
582       super(name);
583       // Create a new server socket and set to non blocking mode
584       acceptChannel = ServerSocketChannel.open();
585       acceptChannel.configureBlocking(false);
586 
587       // Bind the server socket to the local host and port
588       bind(acceptChannel.socket(), isa, backlogLength);
589       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
590       // create a selector;
591       selector= Selector.open();
592 
593       readers = new Reader[readThreads];
594       readPool = Executors.newFixedThreadPool(readThreads,
595         new ThreadFactoryBuilder().setNameFormat(
596           "RpcServer.reader=%d,port=" + port).setDaemon(true).build());
597       for (int i = 0; i < readThreads; ++i) {
598         Reader reader = new Reader();
599         readers[i] = reader;
600         readPool.execute(reader);
601       }
602       LOG.info(getName() + ": started " + readThreads + " reader(s).");
603 
604       // Register accepts on the server socket with the selector.
605       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
606       this.setName("RpcServer.listener,port=" + port);
607       this.setDaemon(true);
608     }
609 
610 
611     private class Reader implements Runnable {
612       private volatile boolean adding = false;
613       private final Selector readSelector;
614 
615       Reader() throws IOException {
616         this.readSelector = Selector.open();
617       }
618       public void run() {
619         try {
620           doRunLoop();
621         } finally {
622           try {
623             readSelector.close();
624           } catch (IOException ioe) {
625             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
626           }
627         }
628       }
629 
630       private synchronized void doRunLoop() {
631         while (running) {
632           SelectionKey key = null;
633           try {
634             readSelector.select();
635             while (adding) {
636               this.wait(1000);
637             }
638 
639             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
640             while (iter.hasNext()) {
641               key = iter.next();
642               iter.remove();
643               if (key.isValid()) {
644                 if (key.isReadable()) {
645                   doRead(key);
646                 }
647               }
648               key = null;
649             }
650           } catch (InterruptedException e) {
651             if (running) {                      // unexpected -- log it
652               LOG.info(getName() + ": unexpectedly interrupted: " +
653                 StringUtils.stringifyException(e));
654             }
655           } catch (IOException ex) {
656             LOG.error(getName() + ": error in Reader", ex);
657           }
658         }
659       }
660 
661       /**
662        * This gets reader into the state that waits for the new channel
663        * to be registered with readSelector. If it was waiting in select()
664        * the thread will be woken up, otherwise whenever select() is called
665        * it will return even if there is nothing to read and wait
666        * in while(adding) for finishAdd call
667        */
668       public void startAdd() {
669         adding = true;
670         readSelector.wakeup();
671       }
672 
673       public synchronized SelectionKey registerChannel(SocketChannel channel)
674         throws IOException {
675         return channel.register(readSelector, SelectionKey.OP_READ);
676       }
677 
678       public synchronized void finishAdd() {
679         adding = false;
680         this.notify();
681       }
682     }
683 
684     /** cleanup connections from connectionList. Choose a random range
685      * to scan and also have a limit on the number of the connections
686      * that will be cleanedup per run. The criteria for cleanup is the time
687      * for which the connection was idle. If 'force' is true then all
688      * connections will be looked at for the cleanup.
689      * @param force all connections will be looked at for cleanup
690      */
691     private void cleanupConnections(boolean force) {
692       if (force || numConnections > thresholdIdleConnections) {
693         long currentTime = System.currentTimeMillis();
694         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
695           return;
696         }
697         int start = 0;
698         int end = numConnections - 1;
699         if (!force) {
700           start = rand.nextInt() % numConnections;
701           end = rand.nextInt() % numConnections;
702           int temp;
703           if (end < start) {
704             temp = start;
705             start = end;
706             end = temp;
707           }
708         }
709         int i = start;
710         int numNuked = 0;
711         while (i <= end) {
712           Connection c;
713           synchronized (connectionList) {
714             try {
715               c = connectionList.get(i);
716             } catch (Exception e) {return;}
717           }
718           if (c.timedOut(currentTime)) {
719             if (LOG.isDebugEnabled())
720               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
721             closeConnection(c);
722             numNuked++;
723             end--;
724             //noinspection UnusedAssignment
725             c = null;
726             if (!force && numNuked == maxConnectionsToNuke) break;
727           }
728           else i++;
729         }
730         lastCleanupRunTime = System.currentTimeMillis();
731       }
732     }
733 
734     @Override
735     public void run() {
736       LOG.info(getName() + ": starting");
737       while (running) {
738         SelectionKey key = null;
739         try {
740           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
741           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
742           while (iter.hasNext()) {
743             key = iter.next();
744             iter.remove();
745             try {
746               if (key.isValid()) {
747                 if (key.isAcceptable())
748                   doAccept(key);
749               }
750             } catch (IOException ignored) {
751             }
752             key = null;
753           }
754         } catch (OutOfMemoryError e) {
755           if (errorHandler != null) {
756             if (errorHandler.checkOOME(e)) {
757               LOG.info(getName() + ": exiting on OutOfMemoryError");
758               closeCurrentConnection(key, e);
759               cleanupConnections(true);
760               return;
761             }
762           } else {
763             // we can run out of memory if we have too many threads
764             // log the event and sleep for a minute and give
765             // some thread(s) a chance to finish
766             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
767             closeCurrentConnection(key, e);
768             cleanupConnections(true);
769             try { Thread.sleep(60000); } catch (Exception ignored) {}
770           }
771         } catch (Exception e) {
772           closeCurrentConnection(key, e);
773         }
774         cleanupConnections(false);
775       }
776       LOG.info(getName() + ": stopping");
777 
778       synchronized (this) {
779         try {
780           acceptChannel.close();
781           selector.close();
782         } catch (IOException ignored) { }
783 
784         selector= null;
785         acceptChannel= null;
786 
787         // clean up all connections
788         while (!connectionList.isEmpty()) {
789           closeConnection(connectionList.remove(0));
790         }
791       }
792     }
793 
794     private void closeCurrentConnection(SelectionKey key, Throwable e) {
795       if (key != null) {
796         Connection c = (Connection)key.attachment();
797         if (c != null) {
798           if (LOG.isDebugEnabled()) {
799             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
800                 (e != null ? " on error " + e.getMessage() : ""));
801           }
802           closeConnection(c);
803           key.attach(null);
804         }
805       }
806     }
807 
808     InetSocketAddress getAddress() {
809       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
810     }
811 
812     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
813       Connection c;
814       ServerSocketChannel server = (ServerSocketChannel) key.channel();
815 
816       SocketChannel channel;
817       while ((channel = server.accept()) != null) {
818         try {
819           channel.configureBlocking(false);
820           channel.socket().setTcpNoDelay(tcpNoDelay);
821           channel.socket().setKeepAlive(tcpKeepAlive);
822         } catch (IOException ioe) {
823           channel.close();
824           throw ioe;
825         }
826 
827         Reader reader = getReader();
828         try {
829           reader.startAdd();
830           SelectionKey readKey = reader.registerChannel(channel);
831           c = getConnection(channel, System.currentTimeMillis());
832           readKey.attach(c);
833           synchronized (connectionList) {
834             connectionList.add(numConnections, c);
835             numConnections++;
836           }
837           if (LOG.isDebugEnabled())
838             LOG.debug(getName() + ": connection from " + c.toString() +
839                 "; # active connections: " + numConnections);
840         } finally {
841           reader.finishAdd();
842         }
843       }
844     }
845 
846     void doRead(SelectionKey key) throws InterruptedException {
847       int count = 0;
848       Connection c = (Connection)key.attachment();
849       if (c == null) {
850         return;
851       }
852       c.setLastContact(System.currentTimeMillis());
853       try {
854         count = c.readAndProcess();
855       } catch (InterruptedException ieo) {
856         throw ieo;
857       } catch (Exception e) {
858         LOG.warn(getName() + ": count of bytes read: " + count, e);
859         count = -1; //so that the (count < 0) block is executed
860       }
861       if (count < 0) {
862         if (LOG.isDebugEnabled()) {
863           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
864             " because read count=" + count +
865             ". Number of active connections: " + numConnections);
866         }
867         closeConnection(c);
868         // c = null;
869       } else {
870         c.setLastContact(System.currentTimeMillis());
871       }
872     }
873 
874     synchronized void doStop() {
875       if (selector != null) {
876         selector.wakeup();
877         Thread.yield();
878       }
879       if (acceptChannel != null) {
880         try {
881           acceptChannel.socket().close();
882         } catch (IOException e) {
883           LOG.info(getName() + ": exception in closing listener socket. " + e);
884         }
885       }
886       readPool.shutdownNow();
887     }
888 
889     // The method that will return the next reader to work with
890     // Simplistic implementation of round robin for now
891     Reader getReader() {
892       currentReader = (currentReader + 1) % readers.length;
893       return readers[currentReader];
894     }
895   }
896 
897   // Sends responses of RPC back to clients.
898   protected class Responder extends Thread {
899     private final Selector writeSelector;
900     private int pending;         // connections waiting to register
901 
902     Responder() throws IOException {
903       this.setName("RpcServer.responder");
904       this.setDaemon(true);
905       writeSelector = Selector.open(); // create a selector
906       pending = 0;
907     }
908 
909     @Override
910     public void run() {
911       LOG.info(getName() + ": starting");
912       try {
913         doRunLoop();
914       } finally {
915         LOG.info(getName() + ": stopping");
916         try {
917           writeSelector.close();
918         } catch (IOException ioe) {
919           LOG.error(getName() + ": couldn't close write selector", ioe);
920         }
921       }
922     }
923 
924     private void doRunLoop() {
925       long lastPurgeTime = 0;   // last check for old calls.
926 
927       while (running) {
928         try {
929           waitPending();     // If a channel is being registered, wait.
930           writeSelector.select(purgeTimeout);
931           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
932           while (iter.hasNext()) {
933             SelectionKey key = iter.next();
934             iter.remove();
935             try {
936               if (key.isValid() && key.isWritable()) {
937                   doAsyncWrite(key);
938               }
939             } catch (IOException e) {
940               LOG.info(getName() + ": asyncWrite", e);
941             }
942           }
943           long now = System.currentTimeMillis();
944           if (now < lastPurgeTime + purgeTimeout) {
945             continue;
946           }
947           lastPurgeTime = now;
948           //
949           // If there were some calls that have not been sent out for a
950           // long time, discard them.
951           //
952           if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
953           ArrayList<Call> calls;
954 
955           // get the list of channels from list of keys.
956           synchronized (writeSelector.keys()) {
957             calls = new ArrayList<Call>(writeSelector.keys().size());
958             iter = writeSelector.keys().iterator();
959             while (iter.hasNext()) {
960               SelectionKey key = iter.next();
961               Call call = (Call)key.attachment();
962               if (call != null && key.channel() == call.connection.channel) {
963                 calls.add(call);
964               }
965             }
966           }
967 
968           for(Call call : calls) {
969             try {
970               doPurge(call, now);
971             } catch (IOException e) {
972               LOG.warn(getName() + ": error in purging old calls " + e);
973             }
974           }
975         } catch (OutOfMemoryError e) {
976           if (errorHandler != null) {
977             if (errorHandler.checkOOME(e)) {
978               LOG.info(getName() + ": exiting on OutOfMemoryError");
979               return;
980             }
981           } else {
982             //
983             // we can run out of memory if we have too many threads
984             // log the event and sleep for a minute and give
985             // some thread(s) a chance to finish
986             //
987             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
988             try { Thread.sleep(60000); } catch (Exception ignored) {}
989           }
990         } catch (Exception e) {
991           LOG.warn(getName() + ": exception in Responder " +
992                    StringUtils.stringifyException(e));
993         }
994       }
995       LOG.info(getName() + ": stopped");
996     }
997 
998     private void doAsyncWrite(SelectionKey key) throws IOException {
999       Call call = (Call)key.attachment();
1000       if (call == null) {
1001         return;
1002       }
1003       if (key.channel() != call.connection.channel) {
1004         throw new IOException("doAsyncWrite: bad channel");
1005       }
1006 
1007       synchronized(call.connection.responseQueue) {
1008         if (processResponse(call.connection.responseQueue, false)) {
1009           try {
1010             key.interestOps(0);
1011           } catch (CancelledKeyException e) {
1012             /* The Listener/reader might have closed the socket.
1013              * We don't explicitly cancel the key, so not sure if this will
1014              * ever fire.
1015              * This warning could be removed.
1016              */
1017             LOG.warn("Exception while changing ops : " + e);
1018           }
1019         }
1020       }
1021     }
1022 
1023     //
1024     // Remove calls that have been pending in the responseQueue
1025     // for a long time.
1026     //
1027     private void doPurge(Call call, long now) throws IOException {
1028       synchronized (call.connection.responseQueue) {
1029         Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
1030         while (iter.hasNext()) {
1031           Call nextCall = iter.next();
1032           if (now > nextCall.timestamp + purgeTimeout) {
1033             closeConnection(nextCall.connection);
1034             break;
1035           }
1036         }
1037       }
1038     }
1039 
1040     // Processes one response. Returns true if there are no more pending
1041     // data for this channel.
1042     //
1043     private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
1044     throws IOException {
1045       boolean error = true;
1046       boolean done = false;       // there is more data for this channel.
1047       int numElements;
1048       Call call = null;
1049       try {
1050         //noinspection SynchronizationOnLocalVariableOrMethodParameter
1051         synchronized (responseQueue) {
1052           //
1053           // If there are no items for this channel, then we are done
1054           //
1055           numElements = responseQueue.size();
1056           if (numElements == 0) {
1057             error = false;
1058             return true;              // no more data for this channel.
1059           }
1060           //
1061           // Extract the first call
1062           //
1063           call = responseQueue.removeFirst();
1064           SocketChannel channel = call.connection.channel;
1065           //
1066           // Send as much data as we can in the non-blocking fashion
1067           //
1068           long numBytes = channelWrite(channel, call.response);
1069           if (numBytes < 0) {
1070             return true;
1071           }
1072           if (!call.response.hasRemaining()) {
1073             call.connection.decRpcCount();
1074             //noinspection RedundantIfStatement
1075             if (numElements == 1) {    // last call fully processes.
1076               done = true;             // no more data for this channel.
1077             } else {
1078               done = false;            // more calls pending to be sent.
1079             }
1080             if (LOG.isDebugEnabled()) {
1081               LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
1082             }
1083           } else {
1084             //
1085             // If we were unable to write the entire response out, then
1086             // insert in Selector queue.
1087             //
1088             call.connection.responseQueue.addFirst(call);
1089 
1090             if (inHandler) {
1091               // set the serve time when the response has to be sent later
1092               call.timestamp = System.currentTimeMillis();
1093               if (enqueueInSelector(call))
1094                 done = true;
1095             }
1096             if (LOG.isDebugEnabled()) {
1097               LOG.debug(getName() + call.toShortString() + " partially sent, wrote " +
1098                 numBytes + " bytes.");
1099             }
1100           }
1101           error = false;              // everything went off well
1102         }
1103       } finally {
1104         if (error && call != null) {
1105           LOG.warn(getName() + call.toShortString() + ": output error");
1106           done = true;               // error. no more data for this channel.
1107           closeConnection(call.connection);
1108         }
1109       }
1110       if (done) call.done();
1111       return done;
1112     }
1113 
1114     //
1115     // Enqueue for background thread to send responses out later.
1116     //
1117     private boolean enqueueInSelector(Call call) throws IOException {
1118       boolean done = false;
1119       incPending();
1120       try {
1121         // Wake up the thread blocked on select, only then can the call
1122         // to channel.register() complete.
1123         SocketChannel channel = call.connection.channel;
1124         writeSelector.wakeup();
1125         channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1126       } catch (ClosedChannelException e) {
1127         //It's OK.  Channel might be closed else where.
1128         done = true;
1129       } finally {
1130         decPending();
1131       }
1132       return done;
1133     }
1134 
1135     //
1136     // Enqueue a response from the application.
1137     //
1138     void doRespond(Call call) throws IOException {
1139       // set the serve time when the response has to be sent later
1140       call.timestamp = System.currentTimeMillis();
1141 
1142       boolean doRegister = false;
1143       synchronized (call.connection.responseQueue) {
1144         call.connection.responseQueue.addLast(call);
1145         if (call.connection.responseQueue.size() == 1) {
1146           doRegister = !processResponse(call.connection.responseQueue, false);
1147         }
1148       }
1149       if (doRegister) {
1150         enqueueInSelector(call);
1151       }
1152     }
1153 
1154     private synchronized void incPending() {   // call waiting to be enqueued.
1155       pending++;
1156     }
1157 
1158     private synchronized void decPending() { // call done enqueueing.
1159       pending--;
1160       notify();
1161     }
1162 
1163     private synchronized void waitPending() throws InterruptedException {
1164       while (pending > 0) {
1165         wait();
1166       }
1167     }
1168   }
1169 
1170   @SuppressWarnings("serial")
1171   public static class CallQueueTooBigException extends IOException {
1172     CallQueueTooBigException() {
1173       super();
1174     }
1175   }
1176 
1177   /** Reads calls from a connection and queues them for handling. */
1178   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1179       value="VO_VOLATILE_INCREMENT",
1180       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1181   public class Connection {
1182     // If initial preamble with version and magic has been read or not.
1183     private boolean connectionPreambleRead = false;
1184     // If the connection header has been read or not.
1185     private boolean connectionHeaderRead = false;
1186     protected SocketChannel channel;
1187     private ByteBuffer data;
1188     private ByteBuffer dataLengthBuffer;
1189     protected final LinkedList<Call> responseQueue;
1190     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1191     private long lastContact;
1192     private InetAddress addr;
1193     protected Socket socket;
1194     // Cache the remote host & port info so that even if the socket is
1195     // disconnected, we can say where it used to connect to.
1196     protected String hostAddress;
1197     protected int remotePort;
1198     ConnectionHeader connectionHeader;
1199     /**
1200      * Codec the client asked use.
1201      */
1202     private Codec codec;
1203     /**
1204      * Compression codec the client asked us use.
1205      */
1206     private CompressionCodec compressionCodec;
1207     BlockingService service;
1208     protected UserGroupInformation user = null;
1209     private AuthMethod authMethod;
1210     private boolean saslContextEstablished;
1211     private boolean skipInitialSaslHandshake;
1212     private ByteBuffer unwrappedData;
1213     // When is this set?  FindBugs wants to know!  Says NP
1214     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1215     boolean useSasl;
1216     SaslServer saslServer;
1217     private boolean useWrap = false;
1218     // Fake 'call' for failed authorization response
1219     private static final int AUTHROIZATION_FAILED_CALLID = -1;
1220     private final Call authFailedCall =
1221       new Call(AUTHROIZATION_FAILED_CALLID, this.service, null,
1222         null, null, null, this, null, 0, null, null);
1223     private ByteArrayOutputStream authFailedResponse =
1224         new ByteArrayOutputStream();
1225     // Fake 'call' for SASL context setup
1226     private static final int SASL_CALLID = -33;
1227     private final Call saslCall =
1228       new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
1229 
1230     public UserGroupInformation attemptingUser = null; // user name before auth
1231 
1232     public Connection(SocketChannel channel, long lastContact) {
1233       this.channel = channel;
1234       this.lastContact = lastContact;
1235       this.data = null;
1236       this.dataLengthBuffer = ByteBuffer.allocate(4);
1237       this.socket = channel.socket();
1238       this.addr = socket.getInetAddress();
1239       if (addr == null) {
1240         this.hostAddress = "*Unknown*";
1241       } else {
1242         this.hostAddress = addr.getHostAddress();
1243       }
1244       this.remotePort = socket.getPort();
1245       this.responseQueue = new LinkedList<Call>();
1246       if (socketSendBufferSize != 0) {
1247         try {
1248           socket.setSendBufferSize(socketSendBufferSize);
1249         } catch (IOException e) {
1250           LOG.warn("Connection: unable to set socket send buffer size to " +
1251                    socketSendBufferSize);
1252         }
1253       }
1254     }
1255 
1256     @Override
1257     public String toString() {
1258       return getHostAddress() + ":" + remotePort;
1259     }
1260 
1261     public String getHostAddress() {
1262       return hostAddress;
1263     }
1264 
1265     public InetAddress getHostInetAddress() {
1266       return addr;
1267     }
1268 
1269     public int getRemotePort() {
1270       return remotePort;
1271     }
1272 
1273     public void setLastContact(long lastContact) {
1274       this.lastContact = lastContact;
1275     }
1276 
1277     public long getLastContact() {
1278       return lastContact;
1279     }
1280 
1281     /* Return true if the connection has no outstanding rpc */
1282     private boolean isIdle() {
1283       return rpcCount.get() == 0;
1284     }
1285 
1286     /* Decrement the outstanding RPC count */
1287     protected void decRpcCount() {
1288       rpcCount.decrement();
1289     }
1290 
1291     /* Increment the outstanding RPC count */
1292     protected void incRpcCount() {
1293       rpcCount.increment();
1294     }
1295 
1296     protected boolean timedOut(long currentTime) {
1297       return isIdle() && currentTime - lastContact > maxIdleTime;
1298     }
1299 
1300     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1301         throws IOException {
1302       if (authMethod == AuthMethod.DIGEST) {
1303         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1304             secretManager);
1305         UserGroupInformation ugi = tokenId.getUser();
1306         if (ugi == null) {
1307           throw new AccessDeniedException(
1308               "Can't retrieve username from tokenIdentifier.");
1309         }
1310         ugi.addTokenIdentifier(tokenId);
1311         return ugi;
1312       } else {
1313         return UserGroupInformation.createRemoteUser(authorizedId);
1314       }
1315     }
1316 
1317     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1318         InterruptedException {
1319       if (saslContextEstablished) {
1320         if (LOG.isDebugEnabled())
1321           LOG.debug("Have read input token of size " + saslToken.length
1322               + " for processing by saslServer.unwrap()");
1323 
1324         if (!useWrap) {
1325           processOneRpc(saslToken);
1326         } else {
1327           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1328           processUnwrappedData(plaintextData);
1329         }
1330       } else {
1331         byte[] replyToken = null;
1332         try {
1333           if (saslServer == null) {
1334             switch (authMethod) {
1335             case DIGEST:
1336               if (secretManager == null) {
1337                 throw new AccessDeniedException(
1338                     "Server is not configured to do DIGEST authentication.");
1339               }
1340               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1341                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1342                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1343                       secretManager, this));
1344               break;
1345             default:
1346               UserGroupInformation current = UserGroupInformation
1347               .getCurrentUser();
1348               String fullName = current.getUserName();
1349               if (LOG.isDebugEnabled()) {
1350                 LOG.debug("Kerberos principal name is " + fullName);
1351               }
1352               final String names[] = SaslUtil.splitKerberosName(fullName);
1353               if (names.length != 3) {
1354                 throw new AccessDeniedException(
1355                     "Kerberos principal name does NOT have the expected "
1356                         + "hostname part: " + fullName);
1357               }
1358               current.doAs(new PrivilegedExceptionAction<Object>() {
1359                 @Override
1360                 public Object run() throws SaslException {
1361                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1362                       .getMechanismName(), names[0], names[1],
1363                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1364                   return null;
1365                 }
1366               });
1367             }
1368             if (saslServer == null)
1369               throw new AccessDeniedException(
1370                   "Unable to find SASL server implementation for "
1371                       + authMethod.getMechanismName());
1372             if (LOG.isDebugEnabled()) {
1373               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1374             }
1375           }
1376           if (LOG.isDebugEnabled()) {
1377             LOG.debug("Have read input token of size " + saslToken.length
1378                 + " for processing by saslServer.evaluateResponse()");
1379           }
1380           replyToken = saslServer.evaluateResponse(saslToken);
1381         } catch (IOException e) {
1382           IOException sendToClient = e;
1383           Throwable cause = e;
1384           while (cause != null) {
1385             if (cause instanceof InvalidToken) {
1386               sendToClient = (InvalidToken) cause;
1387               break;
1388             }
1389             cause = cause.getCause();
1390           }
1391           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1392             sendToClient.getLocalizedMessage());
1393           metrics.authenticationFailure();
1394           String clientIP = this.toString();
1395           // attempting user could be null
1396           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1397           throw e;
1398         }
1399         if (replyToken != null) {
1400           if (LOG.isDebugEnabled()) {
1401             LOG.debug("Will send token of size " + replyToken.length
1402                 + " from saslServer.");
1403           }
1404           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1405               null);
1406         }
1407         if (saslServer.isComplete()) {
1408           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1409           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1410           user = getAuthorizedUgi(saslServer.getAuthorizationID());
1411           if (LOG.isDebugEnabled()) {
1412             LOG.debug("SASL server context established. Authenticated client: "
1413               + user + ". Negotiated QoP is "
1414               + saslServer.getNegotiatedProperty(Sasl.QOP));
1415           }
1416           metrics.authenticationSuccess();
1417           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1418           saslContextEstablished = true;
1419         }
1420       }
1421     }
1422 
1423     /**
1424      * No protobuf encoding of raw sasl messages
1425      */
1426     private void doRawSaslReply(SaslStatus status, Writable rv,
1427         String errorClass, String error) throws IOException {
1428       ByteBufferOutputStream saslResponse = null;
1429       DataOutputStream out = null;
1430       try {
1431         // In my testing, have noticed that sasl messages are usually
1432         // in the ballpark of 100-200. That's why the initial capacity is 256.
1433         saslResponse = new ByteBufferOutputStream(256);
1434         out = new DataOutputStream(saslResponse);
1435         out.writeInt(status.state); // write status
1436         if (status == SaslStatus.SUCCESS) {
1437           rv.write(out);
1438         } else {
1439           WritableUtils.writeString(out, errorClass);
1440           WritableUtils.writeString(out, error);
1441         }
1442         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1443         saslCall.responder = responder;
1444         saslCall.sendResponseIfReady();
1445       } finally {
1446         if (saslResponse != null) {
1447           saslResponse.close();
1448         }
1449         if (out != null) {
1450           out.close();
1451         }
1452       }
1453     }
1454 
1455     private void disposeSasl() {
1456       if (saslServer != null) {
1457         try {
1458           saslServer.dispose();
1459           saslServer = null;
1460         } catch (SaslException ignored) {
1461         }
1462       }
1463     }
1464 
1465     /**
1466      * Read off the wire.
1467      * @return Returns -1 if failure (and caller will close connection) else return how many
1468      * bytes were read and processed
1469      * @throws IOException
1470      * @throws InterruptedException
1471      */
1472     public int readAndProcess() throws IOException, InterruptedException {
1473       while (true) {
1474         // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1475         // does, read in the rest of the connection preamble, the version and the auth method.
1476         // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1477         // length into the 4-byte this.dataLengthBuffer.
1478         int count;
1479         if (this.dataLengthBuffer.remaining() > 0) {
1480           count = channelRead(channel, this.dataLengthBuffer);
1481           if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
1482             return count;
1483           }
1484         }
1485         // If we have not read the connection setup preamble, look to see if that is on the wire.
1486         if (!connectionPreambleRead) {
1487           // Check for 'HBas' magic.
1488           this.dataLengthBuffer.flip();
1489           if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
1490             return doBadPreambleHandling("Expected HEADER=" +
1491               Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
1492               " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1493               " from " + toString());
1494           }
1495           // Now read the next two bytes, the version and the auth to use.
1496           ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1497           count = channelRead(channel, versionAndAuthBytes);
1498           if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1499             return count;
1500           }
1501           int version = versionAndAuthBytes.get(0);
1502           byte authbyte = versionAndAuthBytes.get(1);
1503           this.authMethod = AuthMethod.valueOf(authbyte);
1504           if (version != CURRENT_VERSION) {
1505             String msg = getFatalConnectionString(version, authbyte);
1506             return doBadPreambleHandling(msg, new WrongVersionException(msg));
1507           }
1508           if (authMethod == null) {
1509             String msg = getFatalConnectionString(version, authbyte);
1510             return doBadPreambleHandling(msg, new BadAuthException(msg));
1511           }
1512           if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1513             AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1514             setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1515             responder.doRespond(authFailedCall);
1516             throw ae;
1517           }
1518           if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1519             doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1520                 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1521             authMethod = AuthMethod.SIMPLE;
1522             // client has already sent the initial Sasl message and we
1523             // should ignore it. Both client and server should fall back
1524             // to simple auth from now on.
1525             skipInitialSaslHandshake = true;
1526           }
1527           if (authMethod != AuthMethod.SIMPLE) {
1528             useSasl = true;
1529           }
1530           connectionPreambleRead = true;
1531           // Preamble checks out. Go around again to read actual connection header.
1532           dataLengthBuffer.clear();
1533           continue;
1534         }
1535         // We have read a length and we have read the preamble.  It is either the connection header
1536         // or it is a request.
1537         if (data == null) {
1538           dataLengthBuffer.flip();
1539           int dataLength = dataLengthBuffer.getInt();
1540           if (dataLength == RpcClient.PING_CALL_ID) {
1541             if (!useWrap) { //covers the !useSasl too
1542               dataLengthBuffer.clear();
1543               return 0;  //ping message
1544             }
1545           }
1546           if (dataLength < 0) {
1547             throw new IllegalArgumentException("Unexpected data length "
1548                 + dataLength + "!! from " + getHostAddress());
1549           }
1550           data = ByteBuffer.allocate(dataLength);
1551           incRpcCount();  // Increment the rpc count
1552         }
1553         count = channelRead(channel, data);
1554         if (count < 0) {
1555           return count;
1556         } else if (data.remaining() == 0) {
1557           dataLengthBuffer.clear();
1558           data.flip();
1559           if (skipInitialSaslHandshake) {
1560             data = null;
1561             skipInitialSaslHandshake = false;
1562             continue;
1563           }
1564           boolean headerRead = connectionHeaderRead;
1565           if (useSasl) {
1566             saslReadAndProcess(data.array());
1567           } else {
1568             processOneRpc(data.array());
1569           }
1570           this.data = null;
1571           if (!headerRead) {
1572             continue;
1573           }
1574         } else if (count > 0) {
1575           // We got some data and there is more to read still; go around again.
1576           if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
1577           continue;
1578         }
1579         return count;
1580       }
1581     }
1582 
1583     private String getFatalConnectionString(final int version, final byte authByte) {
1584       return "serverVersion=" + CURRENT_VERSION +
1585       ", clientVersion=" + version + ", authMethod=" + authByte +
1586       ", authSupported=" + (authMethod != null) + " from " + toString();
1587     }
1588 
1589     private int doBadPreambleHandling(final String msg) throws IOException {
1590       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1591     }
1592 
1593     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1594       LOG.warn(msg);
1595       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1596       setupResponse(null, fakeCall, e, msg);
1597       responder.doRespond(fakeCall);
1598       // Returning -1 closes out the connection.
1599       return -1;
1600     }
1601 
1602     // Reads the connection header following version
1603     private void processConnectionHeader(byte[] buf) throws IOException {
1604       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1605       String serviceName = connectionHeader.getServiceName();
1606       if (serviceName == null) throw new EmptyServiceNameException();
1607       this.service = getService(services, serviceName);
1608       if (this.service == null) throw new UnknownServiceException(serviceName);
1609       setupCellBlockCodecs(this.connectionHeader);
1610       UserGroupInformation protocolUser = createUser(connectionHeader);
1611       if (!useSasl) {
1612         user = protocolUser;
1613         if (user != null) {
1614           user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1615         }
1616       } else {
1617         // user is authenticated
1618         user.setAuthenticationMethod(authMethod.authenticationMethod);
1619         //Now we check if this is a proxy user case. If the protocol user is
1620         //different from the 'user', it is a proxy user scenario. However,
1621         //this is not allowed if user authenticated with DIGEST.
1622         if ((protocolUser != null)
1623             && (!protocolUser.getUserName().equals(user.getUserName()))) {
1624           if (authMethod == AuthMethod.DIGEST) {
1625             // Not allowed to doAs if token authentication is used
1626             throw new AccessDeniedException("Authenticated user (" + user
1627                 + ") doesn't match what the client claims to be ("
1628                 + protocolUser + ")");
1629           } else {
1630             // Effective user can be different from authenticated user
1631             // for simple auth or kerberos auth
1632             // The user is the real user. Now we create a proxy user
1633             UserGroupInformation realUser = user;
1634             user = UserGroupInformation.createProxyUser(protocolUser
1635                 .getUserName(), realUser);
1636             // Now the user is a proxy user, set Authentication method Proxy.
1637             user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1638           }
1639         }
1640       }
1641     }
1642 
1643     /**
1644      * Set up cell block codecs
1645      * @param header
1646      * @throws FatalConnectionException
1647      */
1648     private void setupCellBlockCodecs(final ConnectionHeader header)
1649     throws FatalConnectionException {
1650       // TODO: Plug in other supported decoders.
1651       if (!header.hasCellBlockCodecClass()) return;
1652       String className = header.getCellBlockCodecClass();
1653       if (className == null || className.length() == 0) return;
1654       try {
1655         this.codec = (Codec)Class.forName(className).newInstance();
1656       } catch (Exception e) {
1657         throw new UnsupportedCellCodecException(className, e);
1658       }
1659       if (!header.hasCellBlockCompressorClass()) return;
1660       className = header.getCellBlockCompressorClass();
1661       try {
1662         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1663       } catch (Exception e) {
1664         throw new UnsupportedCompressionCodecException(className, e);
1665       }
1666     }
1667 
1668     private void processUnwrappedData(byte[] inBuf) throws IOException,
1669     InterruptedException {
1670       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1671       // Read all RPCs contained in the inBuf, even partial ones
1672       while (true) {
1673         int count = -1;
1674         if (unwrappedDataLengthBuffer.remaining() > 0) {
1675           count = channelRead(ch, unwrappedDataLengthBuffer);
1676           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1677             return;
1678         }
1679 
1680         if (unwrappedData == null) {
1681           unwrappedDataLengthBuffer.flip();
1682           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1683 
1684           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1685             if (LOG.isDebugEnabled())
1686               LOG.debug("Received ping message");
1687             unwrappedDataLengthBuffer.clear();
1688             continue; // ping message
1689           }
1690           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1691         }
1692 
1693         count = channelRead(ch, unwrappedData);
1694         if (count <= 0 || unwrappedData.remaining() > 0)
1695           return;
1696 
1697         if (unwrappedData.remaining() == 0) {
1698           unwrappedDataLengthBuffer.clear();
1699           unwrappedData.flip();
1700           processOneRpc(unwrappedData.array());
1701           unwrappedData = null;
1702         }
1703       }
1704     }
1705 
1706     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1707       if (connectionHeaderRead) {
1708         processRequest(buf);
1709       } else {
1710         processConnectionHeader(buf);
1711         this.connectionHeaderRead = true;
1712         if (!authorizeConnection()) {
1713           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1714           // down the connection instead of trying to read non-existent retun.
1715           throw new AccessDeniedException("Connection from " + this + " for service " +
1716             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1717         }
1718       }
1719     }
1720 
1721     /**
1722      * @param buf Has the request header and the request param and optionally encoded data buffer
1723      * all in this one array.
1724      * @throws IOException
1725      * @throws InterruptedException
1726      */
1727     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1728       long totalRequestSize = buf.length;
1729       int offset = 0;
1730       // Here we read in the header.  We avoid having pb
1731       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1732       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1733       int headerSize = cis.readRawVarint32();
1734       offset = cis.getTotalBytesRead();
1735       RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1736       offset += headerSize;
1737       int id = header.getCallId();
1738       if (LOG.isTraceEnabled()) {
1739         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1740           " totalRequestSize: " + totalRequestSize + " bytes");
1741       }
1742       // Enforcing the call queue size, this triggers a retry in the client
1743       // This is a bit late to be doing this check - we have already read in the total request.
1744       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1745         final Call callTooBig =
1746           new Call(id, this.service, null, null, null, null, this,
1747             responder, totalRequestSize, null, null);
1748         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1749         setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1750           "Call queue is full on " + getListenerAddress() +
1751           ", is hbase.ipc.server.max.callqueue.size too small?");
1752         responder.doRespond(callTooBig);
1753         return;
1754       }
1755       MethodDescriptor md = null;
1756       Message param = null;
1757       CellScanner cellScanner = null;
1758       try {
1759         if (header.hasRequestParam() && header.getRequestParam()) {
1760           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1761           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1762           Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1763           // To read the varint, I need an inputstream; might as well be a CIS.
1764           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1765           int paramSize = cis.readRawVarint32();
1766           offset += cis.getTotalBytesRead();
1767           if (builder != null) {
1768             param = builder.mergeFrom(buf, offset, paramSize).build();
1769           }
1770           offset += paramSize;
1771         }
1772         if (header.hasCellBlockMeta()) {
1773           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1774             buf, offset, buf.length);
1775         }
1776       } catch (Throwable t) {
1777         String msg = getListenerAddress() + " is unable to read call parameter from client " +
1778             getHostAddress();
1779         LOG.warn(msg, t);
1780 
1781         // probably the hbase hadoop version does not match the running hadoop version
1782         if (t instanceof LinkageError) {
1783           t = new DoNotRetryIOException(t);
1784         }
1785         // If the method is not present on the server, do not retry.
1786         if (t instanceof UnsupportedOperationException) {
1787           t = new DoNotRetryIOException(t);
1788         }
1789 
1790         final Call readParamsFailedCall =
1791           new Call(id, this.service, null, null, null, null, this,
1792             responder, totalRequestSize, null, null);
1793         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1794         setupResponse(responseBuffer, readParamsFailedCall, t,
1795           msg + "; " + t.getMessage());
1796         responder.doRespond(readParamsFailedCall);
1797         return;
1798       }
1799 
1800       TraceInfo traceInfo = header.hasTraceInfo()
1801           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1802           : null;
1803       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1804               totalRequestSize,
1805               traceInfo, RpcServer.getRemoteIp());
1806       scheduler.dispatch(new CallRunner(RpcServer.this, call));
1807     }
1808 
1809     private boolean authorizeConnection() throws IOException {
1810       try {
1811         // If auth method is DIGEST, the token was obtained by the
1812         // real user for the effective user, therefore not required to
1813         // authorize real user. doAs is allowed only for simple or kerberos
1814         // authentication
1815         if (user != null && user.getRealUser() != null
1816             && (authMethod != AuthMethod.DIGEST)) {
1817           ProxyUsers.authorize(user, this.getHostAddress(), conf);
1818         }
1819         authorize(user, connectionHeader, getHostInetAddress());
1820         if (LOG.isDebugEnabled()) {
1821           LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
1822         }
1823         metrics.authorizationSuccess();
1824       } catch (AuthorizationException ae) {
1825         LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1826         metrics.authorizationFailure();
1827         setupResponse(authFailedResponse, authFailedCall,
1828           new AccessDeniedException(ae), ae.getMessage());
1829         responder.doRespond(authFailedCall);
1830         return false;
1831       }
1832       return true;
1833     }
1834 
1835     protected synchronized void close() {
1836       disposeSasl();
1837       data = null;
1838       this.dataLengthBuffer = null;
1839       if (!channel.isOpen())
1840         return;
1841       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
1842       if (channel.isOpen()) {
1843         try {channel.close();} catch(Exception ignored) {}
1844       }
1845       try {socket.close();} catch(Exception ignored) {}
1846     }
1847 
1848     private UserGroupInformation createUser(ConnectionHeader head) {
1849       UserGroupInformation ugi = null;
1850 
1851       if (!head.hasUserInfo()) {
1852         return null;
1853       }
1854       UserInformation userInfoProto = head.getUserInfo();
1855       String effectiveUser = null;
1856       if (userInfoProto.hasEffectiveUser()) {
1857         effectiveUser = userInfoProto.getEffectiveUser();
1858       }
1859       String realUser = null;
1860       if (userInfoProto.hasRealUser()) {
1861         realUser = userInfoProto.getRealUser();
1862       }
1863       if (effectiveUser != null) {
1864         if (realUser != null) {
1865           UserGroupInformation realUserUgi =
1866               UserGroupInformation.createRemoteUser(realUser);
1867           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1868         } else {
1869           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1870         }
1871       }
1872       return ugi;
1873     }
1874   }
1875 
1876   /**
1877    * Datastructure for passing a {@link BlockingService} and its associated class of
1878    * protobuf service interface.  For example, a server that fielded what is defined
1879    * in the client protobuf service would pass in an implementation of the client blocking service
1880    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1881    */
1882   public static class BlockingServiceAndInterface {
1883     private final BlockingService service;
1884     private final Class<?> serviceInterface;
1885     public BlockingServiceAndInterface(final BlockingService service,
1886         final Class<?> serviceInterface) {
1887       this.service = service;
1888       this.serviceInterface = serviceInterface;
1889     }
1890     public Class<?> getServiceInterface() {
1891       return this.serviceInterface;
1892     }
1893     public BlockingService getBlockingService() {
1894       return this.service;
1895     }
1896   }
1897 
1898   /**
1899    * Constructs a server listening on the named port and address.
1900    * @param serverInstance hosting instance of {@link Server}. We will do authentications if an
1901    * instance else pass null for no authentication check.
1902    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
1903    * @param services A list of services.
1904    * @param isa Where to listen
1905    * @param conf
1906    * @throws IOException
1907    */
1908   public RpcServer(final Server serverInstance, final String name,
1909       final List<BlockingServiceAndInterface> services,
1910       final InetSocketAddress isa, Configuration conf,
1911       RpcScheduler scheduler)
1912       throws IOException {
1913     this.serverInstance = serverInstance;
1914     this.reservoir = new BoundedByteBufferPool(
1915       conf.getInt("hbase.ipc.server.reservoir.max.buffer.size",  1024 * 1024),
1916       conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
1917       // Make the max twice the number of handlers to be safe.
1918       conf.getInt("hbase.ipc.server.reservoir.initial.max",
1919         conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
1920           HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
1921     this.services = services;
1922     this.isa = isa;
1923     this.conf = conf;
1924     this.socketSendBufferSize = 0;
1925     this.maxQueueSize = conf.getInt("hbase.ipc.server.max.callqueue.size",
1926       conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE));
1927     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size",
1928       conf.getInt("ipc.server.read.threadpool.size", 10));
1929     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime",
1930       conf.getInt("ipc.client.connection.maxidletime", 1000));
1931     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max",
1932       conf.getInt("ipc.client.kill.max", 10));
1933     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold",
1934       conf.getInt("ipc.client.idlethreshold", 4000));
1935     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
1936       conf.getLong("ipc.client.call.purge.timeout",
1937         2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
1938     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1939     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1940 
1941     // Start the listener here and let it bind to the port
1942     listener = new Listener(name);
1943     this.port = listener.getAddress().getPort();
1944 
1945     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1946     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
1947     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive",
1948       conf.getBoolean("ipc.server.tcpkeepalive", true));
1949 
1950     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1951     this.delayedCalls = new AtomicInteger(0);
1952     this.ipcUtil = new IPCUtil(conf);
1953 
1954 
1955     // Create the responder here
1956     responder = new Responder();
1957     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1958     this.userProvider = UserProvider.instantiate(conf);
1959     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
1960     if (isSecurityEnabled) {
1961       HBaseSaslRpcServer.init(conf);
1962     }
1963     this.scheduler = scheduler;
1964     this.scheduler.init(new RpcSchedulerContext(this));
1965   }
1966 
1967   /**
1968    * Subclasses of HBaseServer can override this to provide their own
1969    * Connection implementations.
1970    */
1971   protected Connection getConnection(SocketChannel channel, long time) {
1972     return new Connection(channel, time);
1973   }
1974 
1975   /**
1976    * Setup response for the RPC Call.
1977    *
1978    * @param response buffer to serialize the response into
1979    * @param call {@link Call} to which we are setting up the response
1980    * @param error error message, if the call failed
1981    * @param t
1982    * @throws IOException
1983    */
1984   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1985   throws IOException {
1986     if (response != null) response.reset();
1987     call.setResponse(null, null, t, error);
1988   }
1989 
1990   protected void closeConnection(Connection connection) {
1991     synchronized (connectionList) {
1992       if (connectionList.remove(connection)) {
1993         numConnections--;
1994       }
1995     }
1996     connection.close();
1997   }
1998 
1999   Configuration getConf() {
2000     return conf;
2001   }
2002 
2003   /** Sets the socket buffer size used for responding to RPCs.
2004    * @param size send size
2005    */
2006   @Override
2007   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2008 
2009   /** Starts the service.  Must be called before any calls will be handled. */
2010   @Override
2011   public void start() {
2012     startThreads();
2013     openServer();
2014   }
2015 
2016   /**
2017    * Open a previously started server.
2018    */
2019   @Override
2020   public void openServer() {
2021     this.started = true;
2022   }
2023 
2024   @Override
2025   public boolean isStarted() {
2026     return this.started;
2027   }
2028 
2029   /**
2030    * Starts the service threads but does not allow requests to be responded yet.
2031    * Client will get {@link ServerNotRunningYetException} instead.
2032    */
2033   @Override
2034   public synchronized void startThreads() {
2035     AuthenticationTokenSecretManager mgr = createSecretManager();
2036     if (mgr != null) {
2037       setSecretManager(mgr);
2038       mgr.start();
2039     }
2040     this.authManager = new ServiceAuthorizationManager();
2041     HBasePolicyProvider.init(conf, authManager);
2042     responder.start();
2043     listener.start();
2044     scheduler.start();
2045   }
2046 
2047   @Override
2048   public void refreshAuthManager(PolicyProvider pp) {
2049     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2050     // it'll break if you go via static route.
2051     this.authManager.refresh(this.conf, pp);
2052   }
2053 
2054   private AuthenticationTokenSecretManager createSecretManager() {
2055     if (!isSecurityEnabled) return null;
2056     if (serverInstance == null) return null;
2057     if (!(serverInstance instanceof org.apache.hadoop.hbase.Server)) return null;
2058     org.apache.hadoop.hbase.Server server = (org.apache.hadoop.hbase.Server)serverInstance;
2059     Configuration conf = server.getConfiguration();
2060     long keyUpdateInterval =
2061         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2062     long maxAge =
2063         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2064     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2065         server.getServerName().toString(), keyUpdateInterval, maxAge);
2066   }
2067 
2068   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2069     return this.secretManager;
2070   }
2071 
2072   @SuppressWarnings("unchecked")
2073   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2074     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2075   }
2076 
2077   /**
2078    * This is a server side method, which is invoked over RPC. On success
2079    * the return response has protobuf response payload. On failure, the
2080    * exception name and the stack trace are returned in the protobuf response.
2081    */
2082   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2083       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2084   throws IOException {
2085     try {
2086       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2087       // TODO: Review after we add in encoded data blocks.
2088       status.setRPCPacket(param);
2089       status.resume("Servicing call");
2090       //get an instance of the method arg type
2091       long startTime = System.currentTimeMillis();
2092       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2093       Message result = service.callBlockingMethod(md, controller, param);
2094       int processingTime = (int) (System.currentTimeMillis() - startTime);
2095       int qTime = (int) (startTime - receiveTime);
2096       if (LOG.isTraceEnabled()) {
2097         LOG.trace(CurCall.get().toString() +
2098             ", response " + TextFormat.shortDebugString(result) +
2099             " queueTime: " + qTime +
2100             " processingTime: " + processingTime);
2101       }
2102       metrics.dequeuedCall(qTime);
2103       metrics.processedCall(processingTime);
2104       long responseSize = result.getSerializedSize();
2105       // log any RPC responses that are slower than the configured warn
2106       // response time or larger than configured warning size
2107       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2108       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2109       if (tooSlow || tooLarge) {
2110         // when tagging, we let TooLarge trump TooSmall to keep output simple
2111         // note that large responses will often also be slow.
2112         StringBuilder buffer = new StringBuilder(256);
2113         buffer.append(md.getName());
2114         buffer.append("(");
2115         buffer.append(param.getClass().getName());
2116         buffer.append(")");
2117         logResponse(new Object[]{param},
2118             md.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
2119             status.getClient(), startTime, processingTime, qTime,
2120             responseSize);
2121       }
2122       return new Pair<Message, CellScanner>(result,
2123         controller != null? controller.cellScanner(): null);
2124     } catch (Throwable e) {
2125       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2126       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2127       // need to pass it over the wire.
2128       if (e instanceof ServiceException) e = e.getCause();
2129       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2130       if (e instanceof IOException) throw (IOException)e;
2131       LOG.error("Unexpected throwable object ", e);
2132       throw new IOException(e.getMessage(), e);
2133     }
2134   }
2135 
2136   /**
2137    * Logs an RPC response to the LOG file, producing valid JSON objects for
2138    * client Operations.
2139    * @param params The parameters received in the call.
2140    * @param methodName The name of the method invoked
2141    * @param call The string representation of the call
2142    * @param tag  The tag that will be used to indicate this event in the log.
2143    * @param clientAddress   The address of the client who made this call.
2144    * @param startTime       The time that the call was initiated, in ms.
2145    * @param processingTime  The duration that the call took to run, in ms.
2146    * @param qTime           The duration that the call spent on the queue
2147    *                        prior to being initiated, in ms.
2148    * @param responseSize    The size in bytes of the response buffer.
2149    */
2150   void logResponse(Object[] params, String methodName, String call, String tag,
2151       String clientAddress, long startTime, int processingTime, int qTime,
2152       long responseSize)
2153           throws IOException {
2154     // base information that is reported regardless of type of call
2155     Map<String, Object> responseInfo = new HashMap<String, Object>();
2156     responseInfo.put("starttimems", startTime);
2157     responseInfo.put("processingtimems", processingTime);
2158     responseInfo.put("queuetimems", qTime);
2159     responseInfo.put("responsesize", responseSize);
2160     responseInfo.put("client", clientAddress);
2161     responseInfo.put("class", serverInstance == null? "": serverInstance.getClass().getSimpleName());
2162     responseInfo.put("method", methodName);
2163     if (params.length == 2 && serverInstance instanceof HRegionServer &&
2164         params[0] instanceof byte[] &&
2165         params[1] instanceof Operation) {
2166       // if the slow process is a query, we want to log its table as well
2167       // as its own fingerprint
2168       TableName tableName = TableName.valueOf(
2169           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2170       responseInfo.put("table", tableName.getNameAsString());
2171       // annotate the response map with operation details
2172       responseInfo.putAll(((Operation) params[1]).toMap());
2173       // report to the log file
2174       LOG.warn("(operation" + tag + "): " +
2175                MAPPER.writeValueAsString(responseInfo));
2176     } else if (params.length == 1 && serverInstance instanceof HRegionServer &&
2177         params[0] instanceof Operation) {
2178       // annotate the response map with operation details
2179       responseInfo.putAll(((Operation) params[0]).toMap());
2180       // report to the log file
2181       LOG.warn("(operation" + tag + "): " +
2182                MAPPER.writeValueAsString(responseInfo));
2183     } else {
2184       // can't get JSON details, so just report call.toString() along with
2185       // a more generic tag.
2186       responseInfo.put("call", call);
2187       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2188     }
2189   }
2190 
2191   /** Stops the service.  No new calls will be handled after this is called. */
2192   @Override
2193   public synchronized void stop() {
2194     LOG.info("Stopping server on " + port);
2195     running = false;
2196     listener.interrupt();
2197     listener.doStop();
2198     responder.interrupt();
2199     scheduler.stop();
2200     notifyAll();
2201   }
2202 
2203   /** Wait for the server to be stopped.
2204    * Does not wait for all subthreads to finish.
2205    *  See {@link #stop()}.
2206    * @throws InterruptedException e
2207    */
2208   @Override
2209   public synchronized void join() throws InterruptedException {
2210     while (running) {
2211       wait();
2212     }
2213   }
2214 
2215   /**
2216    * Return the socket (ip+port) on which the RPC server is listening to.
2217    * @return the socket (ip+port) on which the RPC server is listening to.
2218    */
2219   @Override
2220   public synchronized InetSocketAddress getListenerAddress() {
2221     return listener.getAddress();
2222   }
2223 
2224   /**
2225    * Set the handler for calling out of RPC for error conditions.
2226    * @param handler the handler implementation
2227    */
2228   @Override
2229   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2230     this.errorHandler = handler;
2231   }
2232 
2233   @Override
2234   public HBaseRPCErrorHandler getErrorHandler() {
2235     return this.errorHandler;
2236   }
2237 
2238   /**
2239    * Returns the metrics instance for reporting RPC call statistics
2240    */
2241   public MetricsHBaseServer getMetrics() {
2242     return metrics;
2243   }
2244 
2245   @Override
2246   public void addCallSize(final long diff) {
2247     this.callQueueSize.add(diff);
2248   }
2249 
2250   /**
2251    * Authorize the incoming client connection.
2252    *
2253    * @param user client user
2254    * @param connection incoming connection
2255    * @param addr InetAddress of incoming connection
2256    * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
2257    */
2258   @SuppressWarnings("static-access")
2259   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2260   throws AuthorizationException {
2261     if (authorize) {
2262       Class<?> c = getServiceInterface(services, connection.getServiceName());
2263       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2264     }
2265   }
2266 
2267   /**
2268    * When the read or write buffer size is larger than this limit, i/o will be
2269    * done in chunks of this size. Most RPC requests and responses would be
2270    * be smaller.
2271    */
2272   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2273 
2274   /**
2275    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2276    * If the amount of data is large, it writes to channel in smaller chunks.
2277    * This is to avoid jdk from creating many direct buffers as the size of
2278    * buffer increases. This also minimizes extra copies in NIO layer
2279    * as a result of multiple write operations required to write a large
2280    * buffer.
2281    *
2282    * @param channel writable byte channel to write to
2283    * @param bufferChain Chain of buffers to write
2284    * @return number of bytes written
2285    * @throws java.io.IOException e
2286    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2287    */
2288   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2289   throws IOException {
2290     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2291     if (count > 0) this.metrics.sentBytes(count);
2292     return count;
2293   }
2294 
2295   /**
2296    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2297    * If the amount of data is large, it writes to channel in smaller chunks.
2298    * This is to avoid jdk from creating many direct buffers as the size of
2299    * ByteBuffer increases. There should not be any performance degredation.
2300    *
2301    * @param channel writable byte channel to write on
2302    * @param buffer buffer to write
2303    * @return number of bytes written
2304    * @throws java.io.IOException e
2305    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2306    */
2307   protected int channelRead(ReadableByteChannel channel,
2308                                    ByteBuffer buffer) throws IOException {
2309 
2310     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2311            channel.read(buffer) : channelIO(channel, null, buffer);
2312     if (count > 0) {
2313       metrics.receivedBytes(count);
2314     }
2315     return count;
2316   }
2317 
2318   /**
2319    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2320    * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only
2321    * one of readCh or writeCh should be non-null.
2322    *
2323    * @param readCh read channel
2324    * @param writeCh write channel
2325    * @param buf buffer to read or write into/out of
2326    * @return bytes written
2327    * @throws java.io.IOException e
2328    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2329    * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)
2330    */
2331   private static int channelIO(ReadableByteChannel readCh,
2332                                WritableByteChannel writeCh,
2333                                ByteBuffer buf) throws IOException {
2334 
2335     int originalLimit = buf.limit();
2336     int initialRemaining = buf.remaining();
2337     int ret = 0;
2338 
2339     while (buf.remaining() > 0) {
2340       try {
2341         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2342         buf.limit(buf.position() + ioSize);
2343 
2344         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2345 
2346         if (ret < ioSize) {
2347           break;
2348         }
2349 
2350       } finally {
2351         buf.limit(originalLimit);
2352       }
2353     }
2354 
2355     int nBytes = initialRemaining - buf.remaining();
2356     return (nBytes > 0) ? nBytes : ret;
2357   }
2358 
2359   /**
2360    * Needed for features such as delayed calls.  We need to be able to store the current call
2361    * so that we can complete it later or ask questions of what is supported by the current ongoing
2362    * call.
2363    * @return An RpcCallConext backed by the currently ongoing call (gotten from a thread local)
2364    */
2365   public static RpcCallContext getCurrentCall() {
2366     return CurCall.get();
2367   }
2368 
2369   /**
2370    * Returns the user credentials associated with the current RPC request or
2371    * <code>null</code> if no credentials were provided.
2372    * @return A User
2373    */
2374   public static User getRequestUser() {
2375     RpcCallContext ctx = getCurrentCall();
2376     return ctx == null? null: ctx.getRequestUser();
2377   }
2378 
2379   /**
2380    * Returns the username for any user associated with the current RPC
2381    * request or <code>null</code> if no user is set.
2382    */
2383   public static String getRequestUserName() {
2384     User user = getRequestUser();
2385     return user == null? null: user.getShortName();
2386   }
2387 
2388   /**
2389    * @return Address of remote client if a request is ongoing, else null
2390    */
2391   public static InetAddress getRemoteAddress() {
2392     RpcCallContext ctx = getCurrentCall();
2393     return ctx == null? null: ctx.getRemoteAddress();
2394   }
2395 
2396   /**
2397    * @param serviceName Some arbitrary string that represents a 'service'.
2398    * @param services Available service instances
2399    * @return Matching BlockingServiceAndInterface pair
2400    */
2401   static BlockingServiceAndInterface getServiceAndInterface(
2402       final List<BlockingServiceAndInterface> services, final String serviceName) {
2403     for (BlockingServiceAndInterface bs : services) {
2404       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2405         return bs;
2406       }
2407     }
2408     return null;
2409   }
2410 
2411   /**
2412    * @param serviceName Some arbitrary string that represents a 'service'.
2413    * @param services Available services and their service interfaces.
2414    * @return Service interface class for <code>serviceName</code>
2415    */
2416   static Class<?> getServiceInterface(
2417       final List<BlockingServiceAndInterface> services,
2418       final String serviceName) {
2419     BlockingServiceAndInterface bsasi =
2420         getServiceAndInterface(services, serviceName);
2421     return bsasi == null? null: bsasi.getServiceInterface();
2422   }
2423 
2424   /**
2425    * @param serviceName Some arbitrary string that represents a 'service'.
2426    * @param services Available services and their service interfaces.
2427    * @return BlockingService that goes with the passed <code>serviceName</code>
2428    */
2429   static BlockingService getService(
2430       final List<BlockingServiceAndInterface> services,
2431       final String serviceName) {
2432     BlockingServiceAndInterface bsasi =
2433         getServiceAndInterface(services, serviceName);
2434     return bsasi == null? null: bsasi.getBlockingService();
2435   }
2436 
2437   /** Returns the remote side ip address when invoked inside an RPC
2438    *  Returns null incase of an error.
2439    *  @return InetAddress
2440    */
2441   public static InetAddress getRemoteIp() {
2442     Call call = CurCall.get();
2443     if (call != null && call.connection.socket != null) {
2444       return call.connection.socket.getInetAddress();
2445     }
2446     return null;
2447   }
2448 
2449   /**
2450    * A convenience method to bind to a given address and report
2451    * better exceptions if the address is not a valid host.
2452    * @param socket the socket to bind
2453    * @param address the address to bind to
2454    * @param backlog the number of connections allowed in the queue
2455    * @throws BindException if the address can't be bound
2456    * @throws UnknownHostException if the address isn't a valid host name
2457    * @throws IOException other random errors from bind
2458    */
2459   public static void bind(ServerSocket socket, InetSocketAddress address,
2460                           int backlog) throws IOException {
2461     try {
2462       socket.bind(address, backlog);
2463     } catch (BindException e) {
2464       BindException bindException =
2465         new BindException("Problem binding to " + address + " : " +
2466             e.getMessage());
2467       bindException.initCause(e);
2468       throw bindException;
2469     } catch (SocketException e) {
2470       // If they try to bind to a different host's address, give a better
2471       // error message.
2472       if ("Unresolved address".equals(e.getMessage())) {
2473         throw new UnknownHostException("Invalid hostname for server: " +
2474                                        address.getHostName());
2475       }
2476       throw e;
2477     }
2478   }
2479 
2480   public RpcScheduler getScheduler() {
2481     return scheduler;
2482   }
2483 }