1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketException;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.GatheringByteChannel;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.HashMap;
49 import java.util.Iterator;
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Random;
54 import java.util.concurrent.ExecutorService;
55 import java.util.concurrent.Executors;
56 import java.util.concurrent.atomic.AtomicInteger;
57
58 import javax.security.sasl.Sasl;
59 import javax.security.sasl.SaslException;
60 import javax.security.sasl.SaslServer;
61
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.classification.InterfaceStability;
66 import org.apache.hadoop.conf.Configuration;
67 import org.apache.hadoop.hbase.CellScanner;
68 import org.apache.hadoop.hbase.DoNotRetryIOException;
69 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
70 import org.apache.hadoop.hbase.HConstants;
71 import org.apache.hadoop.hbase.HRegionInfo;
72 import org.apache.hadoop.hbase.Server;
73 import org.apache.hadoop.hbase.TableName;
74 import org.apache.hadoop.hbase.client.Operation;
75 import org.apache.hadoop.hbase.codec.Codec;
76 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
77 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
78 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
79 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
80 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
82 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
83 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
84 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
85 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
86 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
87 import org.apache.hadoop.hbase.regionserver.HRegionServer;
88 import org.apache.hadoop.hbase.security.AccessDeniedException;
89 import org.apache.hadoop.hbase.security.AuthMethod;
90 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
91 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
92 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
93 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
94 import org.apache.hadoop.hbase.security.SaslStatus;
95 import org.apache.hadoop.hbase.security.SaslUtil;
96 import org.apache.hadoop.hbase.security.User;
97 import org.apache.hadoop.hbase.security.UserProvider;
98 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
99 import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.io.BytesWritable;
102 import org.apache.hadoop.io.IntWritable;
103 import org.apache.hadoop.io.Writable;
104 import org.apache.hadoop.io.WritableUtils;
105 import org.apache.hadoop.io.compress.CompressionCodec;
106 import org.apache.hadoop.security.UserGroupInformation;
107 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
108 import org.apache.hadoop.security.authorize.AuthorizationException;
109 import org.apache.hadoop.security.authorize.PolicyProvider;
110 import org.apache.hadoop.security.authorize.ProxyUsers;
111 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
112 import org.apache.hadoop.security.token.SecretManager;
113 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
114 import org.apache.hadoop.security.token.TokenIdentifier;
115 import org.apache.hadoop.util.StringUtils;
116 import org.cliffc.high_scale_lib.Counter;
117 import org.cloudera.htrace.TraceInfo;
118 import org.codehaus.jackson.map.ObjectMapper;
119
120 import com.google.common.util.concurrent.ThreadFactoryBuilder;
121 import com.google.protobuf.BlockingService;
122 import com.google.protobuf.CodedInputStream;
123 import com.google.protobuf.Descriptors.MethodDescriptor;
124 import com.google.protobuf.Message;
125 import com.google.protobuf.Message.Builder;
126 import com.google.protobuf.ServiceException;
127 import com.google.protobuf.TextFormat;
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
151 @InterfaceStability.Evolving
152 public class RpcServer implements RpcServerInterface {
153
154
155 public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcServer");
156
157 private final boolean authorize;
158 private boolean isSecurityEnabled;
159
160 public static final byte CURRENT_VERSION = 0;
161
162
163
164
165 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
166
167
168
169
170 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
171
172 static final int BUFFER_INITIAL_SIZE = 1024;
173
174 private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
175
176 private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
177
178 private final int warnDelayedCalls;
179
180 private AtomicInteger delayedCalls;
181 private final IPCUtil ipcUtil;
182
183 private static final String AUTH_FAILED_FOR = "Auth failed for ";
184 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
185 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
186 Server.class.getName());
187 protected SecretManager<TokenIdentifier> secretManager;
188 protected ServiceAuthorizationManager authManager;
189
190
191
192
193 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
194
195
196 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
197 = new ThreadLocal<MonitoredRPCHandler>();
198
199 protected final InetSocketAddress isa;
200 protected int port;
201 private int readThreads;
202 protected int maxIdleTime;
203
204
205 protected int thresholdIdleConnections;
206
207
208
209 int maxConnectionsToNuke;
210
211
212
213 protected MetricsHBaseServer metrics;
214
215 protected final Configuration conf;
216
217 private int maxQueueSize;
218 protected int socketSendBufferSize;
219 protected final boolean tcpNoDelay;
220 protected final boolean tcpKeepAlive;
221 protected final long purgeTimeout;
222
223
224
225
226
227
228 volatile boolean running = true;
229
230
231
232
233
234 volatile boolean started = false;
235
236
237
238
239 protected final Counter callQueueSize = new Counter();
240
241 protected final List<Connection> connectionList =
242 Collections.synchronizedList(new LinkedList<Connection>());
243
244
245 private Listener listener = null;
246 protected Responder responder = null;
247 protected int numConnections = 0;
248
249 protected HBaseRPCErrorHandler errorHandler = null;
250
251 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
252 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
253
254
255 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
256 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
257
258 private static final ObjectMapper MAPPER = new ObjectMapper();
259
260 private final int warnResponseTime;
261 private final int warnResponseSize;
262 private final Object serverInstance;
263 private final List<BlockingServiceAndInterface> services;
264
265 private final RpcScheduler scheduler;
266
267 private UserProvider userProvider;
268
269 private final BoundedByteBufferPool reservoir;
270
271
272
273
274
275
276 class Call implements RpcCallContext {
277 protected int id;
278 protected BlockingService service;
279 protected MethodDescriptor md;
280 protected RequestHeader header;
281 protected Message param;
282
283 protected CellScanner cellScanner;
284 protected Connection connection;
285 protected long timestamp;
286
287
288
289
290 protected BufferChain response;
291 protected boolean delayResponse;
292 protected Responder responder;
293 protected boolean delayReturnValue;
294
295 protected long size;
296 protected boolean isError;
297 protected TraceInfo tinfo;
298 private ByteBuffer cellBlock = null;
299
300 private User user;
301 private InetAddress remoteAddress;
302
303
304
305
306 @Deprecated
307 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
308 Message param, CellScanner cellScanner, Connection connection, Responder responder,
309 long size, TraceInfo tinfo) {
310 this(id, service, md, header, param, cellScanner, connection, responder, size, tinfo, null);
311 }
312
313 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
314 Message param, CellScanner cellScanner, Connection connection, Responder responder,
315 long size, TraceInfo tinfo, InetAddress remoteAddress) {
316 this.id = id;
317 this.service = service;
318 this.md = md;
319 this.header = header;
320 this.param = param;
321 this.cellScanner = cellScanner;
322 this.connection = connection;
323 this.timestamp = System.currentTimeMillis();
324 this.response = null;
325 this.delayResponse = false;
326 this.responder = responder;
327 this.isError = false;
328 this.size = size;
329 this.tinfo = tinfo;
330 if (connection != null && connection.user != null) {
331 this.user = userProvider.create(connection.user);
332 } else {
333 this.user = null;
334 }
335 this.remoteAddress = remoteAddress;
336 }
337
338
339
340
341
342 void done() {
343 if (this.cellBlock != null) {
344
345 reservoir.putBuffer(this.cellBlock);
346 this.cellBlock = null;
347 }
348 }
349
350 @Override
351 public String toString() {
352 return toShortString() + " param: " +
353 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
354 " connection: " + connection.toString();
355 }
356
357 protected RequestHeader getHeader() {
358 return this.header;
359 }
360
361 @Override
362 public User getRequestUser() {
363 return user;
364 }
365
366 @Override
367 public String getRequestUserName() {
368 User user = getRequestUser();
369 return user == null? null: user.getShortName();
370 }
371
372 @Override
373 public InetAddress getRemoteAddress() {
374 return remoteAddress;
375 }
376
377
378
379
380
381 String toShortString() {
382 String serviceName = this.connection.service != null?
383 this.connection.service.getDescriptorForType().getName() : "null";
384 StringBuilder sb = new StringBuilder();
385 sb.append("callId: ");
386 sb.append(this.id);
387 sb.append(" service: ");
388 sb.append(serviceName);
389 sb.append(" methodName: ");
390 sb.append((this.md != null) ? this.md.getName() : "");
391 sb.append(" size: ");
392 sb.append(StringUtils.humanReadableInt(this.size));
393 sb.append(" connection: ");
394 sb.append(connection.toString());
395 return sb.toString();
396 }
397
398 String toTraceString() {
399 String serviceName = this.connection.service != null ?
400 this.connection.service.getDescriptorForType().getName() : "";
401 String methodName = (this.md != null) ? this.md.getName() : "";
402 String result = serviceName + "." + methodName;
403 return result;
404 }
405
406 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
407 this.response = new BufferChain(response);
408 }
409
410 protected synchronized void setResponse(Object m, final CellScanner cells,
411 Throwable t, String errorMsg) {
412 if (this.isError) return;
413 if (t != null) this.isError = true;
414 BufferChain bc = null;
415 try {
416 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
417
418 Message result = (Message)m;
419
420 headerBuilder.setCallId(this.id);
421 if (t != null) {
422 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
423 exceptionBuilder.setExceptionClassName(t.getClass().getName());
424 exceptionBuilder.setStackTrace(errorMsg);
425 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
426 if (t instanceof RegionMovedException) {
427
428
429
430 RegionMovedException rme = (RegionMovedException)t;
431 exceptionBuilder.setHostname(rme.getHostname());
432 exceptionBuilder.setPort(rme.getPort());
433 }
434
435 headerBuilder.setException(exceptionBuilder.build());
436 }
437
438
439
440 this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
441 this.connection.compressionCodec, cells, reservoir);
442 if (this.cellBlock != null) {
443 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
444
445 cellBlockBuilder.setLength(this.cellBlock.limit());
446 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
447 }
448 Message header = headerBuilder.build();
449
450
451
452 ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
453 ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
454 int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
455 (this.cellBlock == null? 0: this.cellBlock.limit());
456 ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
457 bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
458 if (connection.useWrap) {
459 bc = wrapWithSasl(bc);
460 }
461 } catch (IOException e) {
462 LOG.warn("Exception while creating response " + e);
463 }
464 this.response = bc;
465 }
466
467 private BufferChain wrapWithSasl(BufferChain bc)
468 throws IOException {
469 if (bc == null) return bc;
470 if (!this.connection.useSasl) return bc;
471
472
473 byte [] responseBytes = bc.getBytes();
474 byte [] token;
475
476
477 synchronized (connection.saslServer) {
478 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
479 }
480 if (LOG.isDebugEnabled())
481 LOG.debug("Adding saslServer wrapped token of size " + token.length
482 + " as call response.");
483
484 ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
485 ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
486 return new BufferChain(bbTokenLength, bbTokenBytes);
487 }
488
489 @Override
490 public synchronized void endDelay(Object result) throws IOException {
491 assert this.delayResponse;
492 assert this.delayReturnValue || result == null;
493 this.delayResponse = false;
494 delayedCalls.decrementAndGet();
495 if (this.delayReturnValue) {
496 this.setResponse(result, null, null, null);
497 }
498 this.responder.doRespond(this);
499 }
500
501 @Override
502 public synchronized void endDelay() throws IOException {
503 this.endDelay(null);
504 }
505
506 @Override
507 public synchronized void startDelay(boolean delayReturnValue) {
508 assert !this.delayResponse;
509 this.delayResponse = true;
510 this.delayReturnValue = delayReturnValue;
511 int numDelayed = delayedCalls.incrementAndGet();
512 if (numDelayed > warnDelayedCalls) {
513 LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
514 }
515 }
516
517 @Override
518 public synchronized void endDelayThrowing(Throwable t) throws IOException {
519 this.setResponse(null, null, t, StringUtils.stringifyException(t));
520 this.delayResponse = false;
521 this.sendResponseIfReady();
522 }
523
524 @Override
525 public synchronized boolean isDelayed() {
526 return this.delayResponse;
527 }
528
529 @Override
530 public synchronized boolean isReturnValueDelayed() {
531 return this.delayReturnValue;
532 }
533
534 @Override
535 public boolean isClientCellBlockSupport() {
536 return this.connection != null && this.connection.codec != null;
537 }
538
539 @Override
540 public long disconnectSince() {
541 if (!connection.channel.isOpen()) {
542 return System.currentTimeMillis() - timestamp;
543 } else {
544 return -1L;
545 }
546 }
547
548 public long getSize() {
549 return this.size;
550 }
551
552
553
554
555
556
557 public synchronized void sendResponseIfReady() throws IOException {
558 if (!this.delayResponse) {
559 this.responder.doRespond(this);
560 }
561 }
562 }
563
564
565 private class Listener extends Thread {
566
567 private ServerSocketChannel acceptChannel = null;
568 private Selector selector = null;
569 private Reader[] readers = null;
570 private int currentReader = 0;
571 private Random rand = new Random();
572 private long lastCleanupRunTime = 0;
573
574 private long cleanupInterval = 10000;
575
576 private int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size",
577 conf.getInt("ipc.server.listen.queue.size", 128));
578
579 private ExecutorService readPool;
580
581 public Listener(final String name) throws IOException {
582 super(name);
583
584 acceptChannel = ServerSocketChannel.open();
585 acceptChannel.configureBlocking(false);
586
587
588 bind(acceptChannel.socket(), isa, backlogLength);
589 port = acceptChannel.socket().getLocalPort();
590
591 selector= Selector.open();
592
593 readers = new Reader[readThreads];
594 readPool = Executors.newFixedThreadPool(readThreads,
595 new ThreadFactoryBuilder().setNameFormat(
596 "RpcServer.reader=%d,port=" + port).setDaemon(true).build());
597 for (int i = 0; i < readThreads; ++i) {
598 Reader reader = new Reader();
599 readers[i] = reader;
600 readPool.execute(reader);
601 }
602 LOG.info(getName() + ": started " + readThreads + " reader(s).");
603
604
605 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
606 this.setName("RpcServer.listener,port=" + port);
607 this.setDaemon(true);
608 }
609
610
611 private class Reader implements Runnable {
612 private volatile boolean adding = false;
613 private final Selector readSelector;
614
615 Reader() throws IOException {
616 this.readSelector = Selector.open();
617 }
618 public void run() {
619 try {
620 doRunLoop();
621 } finally {
622 try {
623 readSelector.close();
624 } catch (IOException ioe) {
625 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
626 }
627 }
628 }
629
630 private synchronized void doRunLoop() {
631 while (running) {
632 SelectionKey key = null;
633 try {
634 readSelector.select();
635 while (adding) {
636 this.wait(1000);
637 }
638
639 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
640 while (iter.hasNext()) {
641 key = iter.next();
642 iter.remove();
643 if (key.isValid()) {
644 if (key.isReadable()) {
645 doRead(key);
646 }
647 }
648 key = null;
649 }
650 } catch (InterruptedException e) {
651 if (running) {
652 LOG.info(getName() + ": unexpectedly interrupted: " +
653 StringUtils.stringifyException(e));
654 }
655 } catch (IOException ex) {
656 LOG.error(getName() + ": error in Reader", ex);
657 }
658 }
659 }
660
661
662
663
664
665
666
667
668 public void startAdd() {
669 adding = true;
670 readSelector.wakeup();
671 }
672
673 public synchronized SelectionKey registerChannel(SocketChannel channel)
674 throws IOException {
675 return channel.register(readSelector, SelectionKey.OP_READ);
676 }
677
678 public synchronized void finishAdd() {
679 adding = false;
680 this.notify();
681 }
682 }
683
684
685
686
687
688
689
690
691 private void cleanupConnections(boolean force) {
692 if (force || numConnections > thresholdIdleConnections) {
693 long currentTime = System.currentTimeMillis();
694 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
695 return;
696 }
697 int start = 0;
698 int end = numConnections - 1;
699 if (!force) {
700 start = rand.nextInt() % numConnections;
701 end = rand.nextInt() % numConnections;
702 int temp;
703 if (end < start) {
704 temp = start;
705 start = end;
706 end = temp;
707 }
708 }
709 int i = start;
710 int numNuked = 0;
711 while (i <= end) {
712 Connection c;
713 synchronized (connectionList) {
714 try {
715 c = connectionList.get(i);
716 } catch (Exception e) {return;}
717 }
718 if (c.timedOut(currentTime)) {
719 if (LOG.isDebugEnabled())
720 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
721 closeConnection(c);
722 numNuked++;
723 end--;
724
725 c = null;
726 if (!force && numNuked == maxConnectionsToNuke) break;
727 }
728 else i++;
729 }
730 lastCleanupRunTime = System.currentTimeMillis();
731 }
732 }
733
734 @Override
735 public void run() {
736 LOG.info(getName() + ": starting");
737 while (running) {
738 SelectionKey key = null;
739 try {
740 selector.select();
741 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
742 while (iter.hasNext()) {
743 key = iter.next();
744 iter.remove();
745 try {
746 if (key.isValid()) {
747 if (key.isAcceptable())
748 doAccept(key);
749 }
750 } catch (IOException ignored) {
751 }
752 key = null;
753 }
754 } catch (OutOfMemoryError e) {
755 if (errorHandler != null) {
756 if (errorHandler.checkOOME(e)) {
757 LOG.info(getName() + ": exiting on OutOfMemoryError");
758 closeCurrentConnection(key, e);
759 cleanupConnections(true);
760 return;
761 }
762 } else {
763
764
765
766 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
767 closeCurrentConnection(key, e);
768 cleanupConnections(true);
769 try { Thread.sleep(60000); } catch (Exception ignored) {}
770 }
771 } catch (Exception e) {
772 closeCurrentConnection(key, e);
773 }
774 cleanupConnections(false);
775 }
776 LOG.info(getName() + ": stopping");
777
778 synchronized (this) {
779 try {
780 acceptChannel.close();
781 selector.close();
782 } catch (IOException ignored) { }
783
784 selector= null;
785 acceptChannel= null;
786
787
788 while (!connectionList.isEmpty()) {
789 closeConnection(connectionList.remove(0));
790 }
791 }
792 }
793
794 private void closeCurrentConnection(SelectionKey key, Throwable e) {
795 if (key != null) {
796 Connection c = (Connection)key.attachment();
797 if (c != null) {
798 if (LOG.isDebugEnabled()) {
799 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
800 (e != null ? " on error " + e.getMessage() : ""));
801 }
802 closeConnection(c);
803 key.attach(null);
804 }
805 }
806 }
807
808 InetSocketAddress getAddress() {
809 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
810 }
811
812 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
813 Connection c;
814 ServerSocketChannel server = (ServerSocketChannel) key.channel();
815
816 SocketChannel channel;
817 while ((channel = server.accept()) != null) {
818 try {
819 channel.configureBlocking(false);
820 channel.socket().setTcpNoDelay(tcpNoDelay);
821 channel.socket().setKeepAlive(tcpKeepAlive);
822 } catch (IOException ioe) {
823 channel.close();
824 throw ioe;
825 }
826
827 Reader reader = getReader();
828 try {
829 reader.startAdd();
830 SelectionKey readKey = reader.registerChannel(channel);
831 c = getConnection(channel, System.currentTimeMillis());
832 readKey.attach(c);
833 synchronized (connectionList) {
834 connectionList.add(numConnections, c);
835 numConnections++;
836 }
837 if (LOG.isDebugEnabled())
838 LOG.debug(getName() + ": connection from " + c.toString() +
839 "; # active connections: " + numConnections);
840 } finally {
841 reader.finishAdd();
842 }
843 }
844 }
845
846 void doRead(SelectionKey key) throws InterruptedException {
847 int count = 0;
848 Connection c = (Connection)key.attachment();
849 if (c == null) {
850 return;
851 }
852 c.setLastContact(System.currentTimeMillis());
853 try {
854 count = c.readAndProcess();
855 } catch (InterruptedException ieo) {
856 throw ieo;
857 } catch (Exception e) {
858 LOG.warn(getName() + ": count of bytes read: " + count, e);
859 count = -1;
860 }
861 if (count < 0) {
862 if (LOG.isDebugEnabled()) {
863 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
864 " because read count=" + count +
865 ". Number of active connections: " + numConnections);
866 }
867 closeConnection(c);
868
869 } else {
870 c.setLastContact(System.currentTimeMillis());
871 }
872 }
873
874 synchronized void doStop() {
875 if (selector != null) {
876 selector.wakeup();
877 Thread.yield();
878 }
879 if (acceptChannel != null) {
880 try {
881 acceptChannel.socket().close();
882 } catch (IOException e) {
883 LOG.info(getName() + ": exception in closing listener socket. " + e);
884 }
885 }
886 readPool.shutdownNow();
887 }
888
889
890
891 Reader getReader() {
892 currentReader = (currentReader + 1) % readers.length;
893 return readers[currentReader];
894 }
895 }
896
897
898 protected class Responder extends Thread {
899 private final Selector writeSelector;
900 private int pending;
901
902 Responder() throws IOException {
903 this.setName("RpcServer.responder");
904 this.setDaemon(true);
905 writeSelector = Selector.open();
906 pending = 0;
907 }
908
909 @Override
910 public void run() {
911 LOG.info(getName() + ": starting");
912 try {
913 doRunLoop();
914 } finally {
915 LOG.info(getName() + ": stopping");
916 try {
917 writeSelector.close();
918 } catch (IOException ioe) {
919 LOG.error(getName() + ": couldn't close write selector", ioe);
920 }
921 }
922 }
923
924 private void doRunLoop() {
925 long lastPurgeTime = 0;
926
927 while (running) {
928 try {
929 waitPending();
930 writeSelector.select(purgeTimeout);
931 Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
932 while (iter.hasNext()) {
933 SelectionKey key = iter.next();
934 iter.remove();
935 try {
936 if (key.isValid() && key.isWritable()) {
937 doAsyncWrite(key);
938 }
939 } catch (IOException e) {
940 LOG.info(getName() + ": asyncWrite", e);
941 }
942 }
943 long now = System.currentTimeMillis();
944 if (now < lastPurgeTime + purgeTimeout) {
945 continue;
946 }
947 lastPurgeTime = now;
948
949
950
951
952 if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
953 ArrayList<Call> calls;
954
955
956 synchronized (writeSelector.keys()) {
957 calls = new ArrayList<Call>(writeSelector.keys().size());
958 iter = writeSelector.keys().iterator();
959 while (iter.hasNext()) {
960 SelectionKey key = iter.next();
961 Call call = (Call)key.attachment();
962 if (call != null && key.channel() == call.connection.channel) {
963 calls.add(call);
964 }
965 }
966 }
967
968 for(Call call : calls) {
969 try {
970 doPurge(call, now);
971 } catch (IOException e) {
972 LOG.warn(getName() + ": error in purging old calls " + e);
973 }
974 }
975 } catch (OutOfMemoryError e) {
976 if (errorHandler != null) {
977 if (errorHandler.checkOOME(e)) {
978 LOG.info(getName() + ": exiting on OutOfMemoryError");
979 return;
980 }
981 } else {
982
983
984
985
986
987 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
988 try { Thread.sleep(60000); } catch (Exception ignored) {}
989 }
990 } catch (Exception e) {
991 LOG.warn(getName() + ": exception in Responder " +
992 StringUtils.stringifyException(e));
993 }
994 }
995 LOG.info(getName() + ": stopped");
996 }
997
998 private void doAsyncWrite(SelectionKey key) throws IOException {
999 Call call = (Call)key.attachment();
1000 if (call == null) {
1001 return;
1002 }
1003 if (key.channel() != call.connection.channel) {
1004 throw new IOException("doAsyncWrite: bad channel");
1005 }
1006
1007 synchronized(call.connection.responseQueue) {
1008 if (processResponse(call.connection.responseQueue, false)) {
1009 try {
1010 key.interestOps(0);
1011 } catch (CancelledKeyException e) {
1012
1013
1014
1015
1016
1017 LOG.warn("Exception while changing ops : " + e);
1018 }
1019 }
1020 }
1021 }
1022
1023
1024
1025
1026
1027 private void doPurge(Call call, long now) throws IOException {
1028 synchronized (call.connection.responseQueue) {
1029 Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
1030 while (iter.hasNext()) {
1031 Call nextCall = iter.next();
1032 if (now > nextCall.timestamp + purgeTimeout) {
1033 closeConnection(nextCall.connection);
1034 break;
1035 }
1036 }
1037 }
1038 }
1039
1040
1041
1042
1043 private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
1044 throws IOException {
1045 boolean error = true;
1046 boolean done = false;
1047 int numElements;
1048 Call call = null;
1049 try {
1050
1051 synchronized (responseQueue) {
1052
1053
1054
1055 numElements = responseQueue.size();
1056 if (numElements == 0) {
1057 error = false;
1058 return true;
1059 }
1060
1061
1062
1063 call = responseQueue.removeFirst();
1064 SocketChannel channel = call.connection.channel;
1065
1066
1067
1068 long numBytes = channelWrite(channel, call.response);
1069 if (numBytes < 0) {
1070 return true;
1071 }
1072 if (!call.response.hasRemaining()) {
1073 call.connection.decRpcCount();
1074
1075 if (numElements == 1) {
1076 done = true;
1077 } else {
1078 done = false;
1079 }
1080 if (LOG.isDebugEnabled()) {
1081 LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
1082 }
1083 } else {
1084
1085
1086
1087
1088 call.connection.responseQueue.addFirst(call);
1089
1090 if (inHandler) {
1091
1092 call.timestamp = System.currentTimeMillis();
1093 if (enqueueInSelector(call))
1094 done = true;
1095 }
1096 if (LOG.isDebugEnabled()) {
1097 LOG.debug(getName() + call.toShortString() + " partially sent, wrote " +
1098 numBytes + " bytes.");
1099 }
1100 }
1101 error = false;
1102 }
1103 } finally {
1104 if (error && call != null) {
1105 LOG.warn(getName() + call.toShortString() + ": output error");
1106 done = true;
1107 closeConnection(call.connection);
1108 }
1109 }
1110 if (done) call.done();
1111 return done;
1112 }
1113
1114
1115
1116
1117 private boolean enqueueInSelector(Call call) throws IOException {
1118 boolean done = false;
1119 incPending();
1120 try {
1121
1122
1123 SocketChannel channel = call.connection.channel;
1124 writeSelector.wakeup();
1125 channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1126 } catch (ClosedChannelException e) {
1127
1128 done = true;
1129 } finally {
1130 decPending();
1131 }
1132 return done;
1133 }
1134
1135
1136
1137
1138 void doRespond(Call call) throws IOException {
1139
1140 call.timestamp = System.currentTimeMillis();
1141
1142 boolean doRegister = false;
1143 synchronized (call.connection.responseQueue) {
1144 call.connection.responseQueue.addLast(call);
1145 if (call.connection.responseQueue.size() == 1) {
1146 doRegister = !processResponse(call.connection.responseQueue, false);
1147 }
1148 }
1149 if (doRegister) {
1150 enqueueInSelector(call);
1151 }
1152 }
1153
1154 private synchronized void incPending() {
1155 pending++;
1156 }
1157
1158 private synchronized void decPending() {
1159 pending--;
1160 notify();
1161 }
1162
1163 private synchronized void waitPending() throws InterruptedException {
1164 while (pending > 0) {
1165 wait();
1166 }
1167 }
1168 }
1169
1170 @SuppressWarnings("serial")
1171 public static class CallQueueTooBigException extends IOException {
1172 CallQueueTooBigException() {
1173 super();
1174 }
1175 }
1176
1177
1178 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1179 value="VO_VOLATILE_INCREMENT",
1180 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1181 public class Connection {
1182
1183 private boolean connectionPreambleRead = false;
1184
1185 private boolean connectionHeaderRead = false;
1186 protected SocketChannel channel;
1187 private ByteBuffer data;
1188 private ByteBuffer dataLengthBuffer;
1189 protected final LinkedList<Call> responseQueue;
1190 private Counter rpcCount = new Counter();
1191 private long lastContact;
1192 private InetAddress addr;
1193 protected Socket socket;
1194
1195
1196 protected String hostAddress;
1197 protected int remotePort;
1198 ConnectionHeader connectionHeader;
1199
1200
1201
1202 private Codec codec;
1203
1204
1205
1206 private CompressionCodec compressionCodec;
1207 BlockingService service;
1208 protected UserGroupInformation user = null;
1209 private AuthMethod authMethod;
1210 private boolean saslContextEstablished;
1211 private boolean skipInitialSaslHandshake;
1212 private ByteBuffer unwrappedData;
1213
1214 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1215 boolean useSasl;
1216 SaslServer saslServer;
1217 private boolean useWrap = false;
1218
1219 private static final int AUTHROIZATION_FAILED_CALLID = -1;
1220 private final Call authFailedCall =
1221 new Call(AUTHROIZATION_FAILED_CALLID, this.service, null,
1222 null, null, null, this, null, 0, null, null);
1223 private ByteArrayOutputStream authFailedResponse =
1224 new ByteArrayOutputStream();
1225
1226 private static final int SASL_CALLID = -33;
1227 private final Call saslCall =
1228 new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
1229
1230 public UserGroupInformation attemptingUser = null;
1231
1232 public Connection(SocketChannel channel, long lastContact) {
1233 this.channel = channel;
1234 this.lastContact = lastContact;
1235 this.data = null;
1236 this.dataLengthBuffer = ByteBuffer.allocate(4);
1237 this.socket = channel.socket();
1238 this.addr = socket.getInetAddress();
1239 if (addr == null) {
1240 this.hostAddress = "*Unknown*";
1241 } else {
1242 this.hostAddress = addr.getHostAddress();
1243 }
1244 this.remotePort = socket.getPort();
1245 this.responseQueue = new LinkedList<Call>();
1246 if (socketSendBufferSize != 0) {
1247 try {
1248 socket.setSendBufferSize(socketSendBufferSize);
1249 } catch (IOException e) {
1250 LOG.warn("Connection: unable to set socket send buffer size to " +
1251 socketSendBufferSize);
1252 }
1253 }
1254 }
1255
1256 @Override
1257 public String toString() {
1258 return getHostAddress() + ":" + remotePort;
1259 }
1260
1261 public String getHostAddress() {
1262 return hostAddress;
1263 }
1264
1265 public InetAddress getHostInetAddress() {
1266 return addr;
1267 }
1268
1269 public int getRemotePort() {
1270 return remotePort;
1271 }
1272
1273 public void setLastContact(long lastContact) {
1274 this.lastContact = lastContact;
1275 }
1276
1277 public long getLastContact() {
1278 return lastContact;
1279 }
1280
1281
1282 private boolean isIdle() {
1283 return rpcCount.get() == 0;
1284 }
1285
1286
1287 protected void decRpcCount() {
1288 rpcCount.decrement();
1289 }
1290
1291
1292 protected void incRpcCount() {
1293 rpcCount.increment();
1294 }
1295
1296 protected boolean timedOut(long currentTime) {
1297 return isIdle() && currentTime - lastContact > maxIdleTime;
1298 }
1299
1300 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1301 throws IOException {
1302 if (authMethod == AuthMethod.DIGEST) {
1303 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1304 secretManager);
1305 UserGroupInformation ugi = tokenId.getUser();
1306 if (ugi == null) {
1307 throw new AccessDeniedException(
1308 "Can't retrieve username from tokenIdentifier.");
1309 }
1310 ugi.addTokenIdentifier(tokenId);
1311 return ugi;
1312 } else {
1313 return UserGroupInformation.createRemoteUser(authorizedId);
1314 }
1315 }
1316
1317 private void saslReadAndProcess(byte[] saslToken) throws IOException,
1318 InterruptedException {
1319 if (saslContextEstablished) {
1320 if (LOG.isDebugEnabled())
1321 LOG.debug("Have read input token of size " + saslToken.length
1322 + " for processing by saslServer.unwrap()");
1323
1324 if (!useWrap) {
1325 processOneRpc(saslToken);
1326 } else {
1327 byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1328 processUnwrappedData(plaintextData);
1329 }
1330 } else {
1331 byte[] replyToken = null;
1332 try {
1333 if (saslServer == null) {
1334 switch (authMethod) {
1335 case DIGEST:
1336 if (secretManager == null) {
1337 throw new AccessDeniedException(
1338 "Server is not configured to do DIGEST authentication.");
1339 }
1340 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1341 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1342 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1343 secretManager, this));
1344 break;
1345 default:
1346 UserGroupInformation current = UserGroupInformation
1347 .getCurrentUser();
1348 String fullName = current.getUserName();
1349 if (LOG.isDebugEnabled()) {
1350 LOG.debug("Kerberos principal name is " + fullName);
1351 }
1352 final String names[] = SaslUtil.splitKerberosName(fullName);
1353 if (names.length != 3) {
1354 throw new AccessDeniedException(
1355 "Kerberos principal name does NOT have the expected "
1356 + "hostname part: " + fullName);
1357 }
1358 current.doAs(new PrivilegedExceptionAction<Object>() {
1359 @Override
1360 public Object run() throws SaslException {
1361 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1362 .getMechanismName(), names[0], names[1],
1363 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1364 return null;
1365 }
1366 });
1367 }
1368 if (saslServer == null)
1369 throw new AccessDeniedException(
1370 "Unable to find SASL server implementation for "
1371 + authMethod.getMechanismName());
1372 if (LOG.isDebugEnabled()) {
1373 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1374 }
1375 }
1376 if (LOG.isDebugEnabled()) {
1377 LOG.debug("Have read input token of size " + saslToken.length
1378 + " for processing by saslServer.evaluateResponse()");
1379 }
1380 replyToken = saslServer.evaluateResponse(saslToken);
1381 } catch (IOException e) {
1382 IOException sendToClient = e;
1383 Throwable cause = e;
1384 while (cause != null) {
1385 if (cause instanceof InvalidToken) {
1386 sendToClient = (InvalidToken) cause;
1387 break;
1388 }
1389 cause = cause.getCause();
1390 }
1391 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1392 sendToClient.getLocalizedMessage());
1393 metrics.authenticationFailure();
1394 String clientIP = this.toString();
1395
1396 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1397 throw e;
1398 }
1399 if (replyToken != null) {
1400 if (LOG.isDebugEnabled()) {
1401 LOG.debug("Will send token of size " + replyToken.length
1402 + " from saslServer.");
1403 }
1404 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1405 null);
1406 }
1407 if (saslServer.isComplete()) {
1408 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1409 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1410 user = getAuthorizedUgi(saslServer.getAuthorizationID());
1411 if (LOG.isDebugEnabled()) {
1412 LOG.debug("SASL server context established. Authenticated client: "
1413 + user + ". Negotiated QoP is "
1414 + saslServer.getNegotiatedProperty(Sasl.QOP));
1415 }
1416 metrics.authenticationSuccess();
1417 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1418 saslContextEstablished = true;
1419 }
1420 }
1421 }
1422
1423
1424
1425
1426 private void doRawSaslReply(SaslStatus status, Writable rv,
1427 String errorClass, String error) throws IOException {
1428 ByteBufferOutputStream saslResponse = null;
1429 DataOutputStream out = null;
1430 try {
1431
1432
1433 saslResponse = new ByteBufferOutputStream(256);
1434 out = new DataOutputStream(saslResponse);
1435 out.writeInt(status.state);
1436 if (status == SaslStatus.SUCCESS) {
1437 rv.write(out);
1438 } else {
1439 WritableUtils.writeString(out, errorClass);
1440 WritableUtils.writeString(out, error);
1441 }
1442 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1443 saslCall.responder = responder;
1444 saslCall.sendResponseIfReady();
1445 } finally {
1446 if (saslResponse != null) {
1447 saslResponse.close();
1448 }
1449 if (out != null) {
1450 out.close();
1451 }
1452 }
1453 }
1454
1455 private void disposeSasl() {
1456 if (saslServer != null) {
1457 try {
1458 saslServer.dispose();
1459 saslServer = null;
1460 } catch (SaslException ignored) {
1461 }
1462 }
1463 }
1464
1465
1466
1467
1468
1469
1470
1471
1472 public int readAndProcess() throws IOException, InterruptedException {
1473 while (true) {
1474
1475
1476
1477
1478 int count;
1479 if (this.dataLengthBuffer.remaining() > 0) {
1480 count = channelRead(channel, this.dataLengthBuffer);
1481 if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
1482 return count;
1483 }
1484 }
1485
1486 if (!connectionPreambleRead) {
1487
1488 this.dataLengthBuffer.flip();
1489 if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
1490 return doBadPreambleHandling("Expected HEADER=" +
1491 Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
1492 " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1493 " from " + toString());
1494 }
1495
1496 ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1497 count = channelRead(channel, versionAndAuthBytes);
1498 if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1499 return count;
1500 }
1501 int version = versionAndAuthBytes.get(0);
1502 byte authbyte = versionAndAuthBytes.get(1);
1503 this.authMethod = AuthMethod.valueOf(authbyte);
1504 if (version != CURRENT_VERSION) {
1505 String msg = getFatalConnectionString(version, authbyte);
1506 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1507 }
1508 if (authMethod == null) {
1509 String msg = getFatalConnectionString(version, authbyte);
1510 return doBadPreambleHandling(msg, new BadAuthException(msg));
1511 }
1512 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1513 AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1514 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1515 responder.doRespond(authFailedCall);
1516 throw ae;
1517 }
1518 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1519 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1520 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1521 authMethod = AuthMethod.SIMPLE;
1522
1523
1524
1525 skipInitialSaslHandshake = true;
1526 }
1527 if (authMethod != AuthMethod.SIMPLE) {
1528 useSasl = true;
1529 }
1530 connectionPreambleRead = true;
1531
1532 dataLengthBuffer.clear();
1533 continue;
1534 }
1535
1536
1537 if (data == null) {
1538 dataLengthBuffer.flip();
1539 int dataLength = dataLengthBuffer.getInt();
1540 if (dataLength == RpcClient.PING_CALL_ID) {
1541 if (!useWrap) {
1542 dataLengthBuffer.clear();
1543 return 0;
1544 }
1545 }
1546 if (dataLength < 0) {
1547 throw new IllegalArgumentException("Unexpected data length "
1548 + dataLength + "!! from " + getHostAddress());
1549 }
1550 data = ByteBuffer.allocate(dataLength);
1551 incRpcCount();
1552 }
1553 count = channelRead(channel, data);
1554 if (count < 0) {
1555 return count;
1556 } else if (data.remaining() == 0) {
1557 dataLengthBuffer.clear();
1558 data.flip();
1559 if (skipInitialSaslHandshake) {
1560 data = null;
1561 skipInitialSaslHandshake = false;
1562 continue;
1563 }
1564 boolean headerRead = connectionHeaderRead;
1565 if (useSasl) {
1566 saslReadAndProcess(data.array());
1567 } else {
1568 processOneRpc(data.array());
1569 }
1570 this.data = null;
1571 if (!headerRead) {
1572 continue;
1573 }
1574 } else if (count > 0) {
1575
1576 if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
1577 continue;
1578 }
1579 return count;
1580 }
1581 }
1582
1583 private String getFatalConnectionString(final int version, final byte authByte) {
1584 return "serverVersion=" + CURRENT_VERSION +
1585 ", clientVersion=" + version + ", authMethod=" + authByte +
1586 ", authSupported=" + (authMethod != null) + " from " + toString();
1587 }
1588
1589 private int doBadPreambleHandling(final String msg) throws IOException {
1590 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1591 }
1592
1593 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1594 LOG.warn(msg);
1595 Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1596 setupResponse(null, fakeCall, e, msg);
1597 responder.doRespond(fakeCall);
1598
1599 return -1;
1600 }
1601
1602
1603 private void processConnectionHeader(byte[] buf) throws IOException {
1604 this.connectionHeader = ConnectionHeader.parseFrom(buf);
1605 String serviceName = connectionHeader.getServiceName();
1606 if (serviceName == null) throw new EmptyServiceNameException();
1607 this.service = getService(services, serviceName);
1608 if (this.service == null) throw new UnknownServiceException(serviceName);
1609 setupCellBlockCodecs(this.connectionHeader);
1610 UserGroupInformation protocolUser = createUser(connectionHeader);
1611 if (!useSasl) {
1612 user = protocolUser;
1613 if (user != null) {
1614 user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1615 }
1616 } else {
1617
1618 user.setAuthenticationMethod(authMethod.authenticationMethod);
1619
1620
1621
1622 if ((protocolUser != null)
1623 && (!protocolUser.getUserName().equals(user.getUserName()))) {
1624 if (authMethod == AuthMethod.DIGEST) {
1625
1626 throw new AccessDeniedException("Authenticated user (" + user
1627 + ") doesn't match what the client claims to be ("
1628 + protocolUser + ")");
1629 } else {
1630
1631
1632
1633 UserGroupInformation realUser = user;
1634 user = UserGroupInformation.createProxyUser(protocolUser
1635 .getUserName(), realUser);
1636
1637 user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1638 }
1639 }
1640 }
1641 }
1642
1643
1644
1645
1646
1647
1648 private void setupCellBlockCodecs(final ConnectionHeader header)
1649 throws FatalConnectionException {
1650
1651 if (!header.hasCellBlockCodecClass()) return;
1652 String className = header.getCellBlockCodecClass();
1653 if (className == null || className.length() == 0) return;
1654 try {
1655 this.codec = (Codec)Class.forName(className).newInstance();
1656 } catch (Exception e) {
1657 throw new UnsupportedCellCodecException(className, e);
1658 }
1659 if (!header.hasCellBlockCompressorClass()) return;
1660 className = header.getCellBlockCompressorClass();
1661 try {
1662 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1663 } catch (Exception e) {
1664 throw new UnsupportedCompressionCodecException(className, e);
1665 }
1666 }
1667
1668 private void processUnwrappedData(byte[] inBuf) throws IOException,
1669 InterruptedException {
1670 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1671
1672 while (true) {
1673 int count = -1;
1674 if (unwrappedDataLengthBuffer.remaining() > 0) {
1675 count = channelRead(ch, unwrappedDataLengthBuffer);
1676 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1677 return;
1678 }
1679
1680 if (unwrappedData == null) {
1681 unwrappedDataLengthBuffer.flip();
1682 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1683
1684 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1685 if (LOG.isDebugEnabled())
1686 LOG.debug("Received ping message");
1687 unwrappedDataLengthBuffer.clear();
1688 continue;
1689 }
1690 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1691 }
1692
1693 count = channelRead(ch, unwrappedData);
1694 if (count <= 0 || unwrappedData.remaining() > 0)
1695 return;
1696
1697 if (unwrappedData.remaining() == 0) {
1698 unwrappedDataLengthBuffer.clear();
1699 unwrappedData.flip();
1700 processOneRpc(unwrappedData.array());
1701 unwrappedData = null;
1702 }
1703 }
1704 }
1705
1706 private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1707 if (connectionHeaderRead) {
1708 processRequest(buf);
1709 } else {
1710 processConnectionHeader(buf);
1711 this.connectionHeaderRead = true;
1712 if (!authorizeConnection()) {
1713
1714
1715 throw new AccessDeniedException("Connection from " + this + " for service " +
1716 connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1717 }
1718 }
1719 }
1720
1721
1722
1723
1724
1725
1726
1727 protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1728 long totalRequestSize = buf.length;
1729 int offset = 0;
1730
1731
1732 CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1733 int headerSize = cis.readRawVarint32();
1734 offset = cis.getTotalBytesRead();
1735 RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1736 offset += headerSize;
1737 int id = header.getCallId();
1738 if (LOG.isTraceEnabled()) {
1739 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1740 " totalRequestSize: " + totalRequestSize + " bytes");
1741 }
1742
1743
1744 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1745 final Call callTooBig =
1746 new Call(id, this.service, null, null, null, null, this,
1747 responder, totalRequestSize, null, null);
1748 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1749 setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1750 "Call queue is full on " + getListenerAddress() +
1751 ", is hbase.ipc.server.max.callqueue.size too small?");
1752 responder.doRespond(callTooBig);
1753 return;
1754 }
1755 MethodDescriptor md = null;
1756 Message param = null;
1757 CellScanner cellScanner = null;
1758 try {
1759 if (header.hasRequestParam() && header.getRequestParam()) {
1760 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1761 if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1762 Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1763
1764 cis = CodedInputStream.newInstance(buf, offset, buf.length);
1765 int paramSize = cis.readRawVarint32();
1766 offset += cis.getTotalBytesRead();
1767 if (builder != null) {
1768 param = builder.mergeFrom(buf, offset, paramSize).build();
1769 }
1770 offset += paramSize;
1771 }
1772 if (header.hasCellBlockMeta()) {
1773 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1774 buf, offset, buf.length);
1775 }
1776 } catch (Throwable t) {
1777 String msg = getListenerAddress() + " is unable to read call parameter from client " +
1778 getHostAddress();
1779 LOG.warn(msg, t);
1780
1781
1782 if (t instanceof LinkageError) {
1783 t = new DoNotRetryIOException(t);
1784 }
1785
1786 if (t instanceof UnsupportedOperationException) {
1787 t = new DoNotRetryIOException(t);
1788 }
1789
1790 final Call readParamsFailedCall =
1791 new Call(id, this.service, null, null, null, null, this,
1792 responder, totalRequestSize, null, null);
1793 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1794 setupResponse(responseBuffer, readParamsFailedCall, t,
1795 msg + "; " + t.getMessage());
1796 responder.doRespond(readParamsFailedCall);
1797 return;
1798 }
1799
1800 TraceInfo traceInfo = header.hasTraceInfo()
1801 ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1802 : null;
1803 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1804 totalRequestSize,
1805 traceInfo, RpcServer.getRemoteIp());
1806 scheduler.dispatch(new CallRunner(RpcServer.this, call));
1807 }
1808
1809 private boolean authorizeConnection() throws IOException {
1810 try {
1811
1812
1813
1814
1815 if (user != null && user.getRealUser() != null
1816 && (authMethod != AuthMethod.DIGEST)) {
1817 ProxyUsers.authorize(user, this.getHostAddress(), conf);
1818 }
1819 authorize(user, connectionHeader, getHostInetAddress());
1820 if (LOG.isDebugEnabled()) {
1821 LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
1822 }
1823 metrics.authorizationSuccess();
1824 } catch (AuthorizationException ae) {
1825 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1826 metrics.authorizationFailure();
1827 setupResponse(authFailedResponse, authFailedCall,
1828 new AccessDeniedException(ae), ae.getMessage());
1829 responder.doRespond(authFailedCall);
1830 return false;
1831 }
1832 return true;
1833 }
1834
1835 protected synchronized void close() {
1836 disposeSasl();
1837 data = null;
1838 this.dataLengthBuffer = null;
1839 if (!channel.isOpen())
1840 return;
1841 try {socket.shutdownOutput();} catch(Exception ignored) {}
1842 if (channel.isOpen()) {
1843 try {channel.close();} catch(Exception ignored) {}
1844 }
1845 try {socket.close();} catch(Exception ignored) {}
1846 }
1847
1848 private UserGroupInformation createUser(ConnectionHeader head) {
1849 UserGroupInformation ugi = null;
1850
1851 if (!head.hasUserInfo()) {
1852 return null;
1853 }
1854 UserInformation userInfoProto = head.getUserInfo();
1855 String effectiveUser = null;
1856 if (userInfoProto.hasEffectiveUser()) {
1857 effectiveUser = userInfoProto.getEffectiveUser();
1858 }
1859 String realUser = null;
1860 if (userInfoProto.hasRealUser()) {
1861 realUser = userInfoProto.getRealUser();
1862 }
1863 if (effectiveUser != null) {
1864 if (realUser != null) {
1865 UserGroupInformation realUserUgi =
1866 UserGroupInformation.createRemoteUser(realUser);
1867 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1868 } else {
1869 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1870 }
1871 }
1872 return ugi;
1873 }
1874 }
1875
1876
1877
1878
1879
1880
1881
1882 public static class BlockingServiceAndInterface {
1883 private final BlockingService service;
1884 private final Class<?> serviceInterface;
1885 public BlockingServiceAndInterface(final BlockingService service,
1886 final Class<?> serviceInterface) {
1887 this.service = service;
1888 this.serviceInterface = serviceInterface;
1889 }
1890 public Class<?> getServiceInterface() {
1891 return this.serviceInterface;
1892 }
1893 public BlockingService getBlockingService() {
1894 return this.service;
1895 }
1896 }
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908 public RpcServer(final Server serverInstance, final String name,
1909 final List<BlockingServiceAndInterface> services,
1910 final InetSocketAddress isa, Configuration conf,
1911 RpcScheduler scheduler)
1912 throws IOException {
1913 this.serverInstance = serverInstance;
1914 this.reservoir = new BoundedByteBufferPool(
1915 conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
1916 conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
1917
1918 conf.getInt("hbase.ipc.server.reservoir.initial.max",
1919 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
1920 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
1921 this.services = services;
1922 this.isa = isa;
1923 this.conf = conf;
1924 this.socketSendBufferSize = 0;
1925 this.maxQueueSize = conf.getInt("hbase.ipc.server.max.callqueue.size",
1926 conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE));
1927 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size",
1928 conf.getInt("ipc.server.read.threadpool.size", 10));
1929 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime",
1930 conf.getInt("ipc.client.connection.maxidletime", 1000));
1931 this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max",
1932 conf.getInt("ipc.client.kill.max", 10));
1933 this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold",
1934 conf.getInt("ipc.client.idlethreshold", 4000));
1935 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
1936 conf.getLong("ipc.client.call.purge.timeout",
1937 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
1938 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1939 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1940
1941
1942 listener = new Listener(name);
1943 this.port = listener.getAddress().getPort();
1944
1945 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1946 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
1947 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive",
1948 conf.getBoolean("ipc.server.tcpkeepalive", true));
1949
1950 this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1951 this.delayedCalls = new AtomicInteger(0);
1952 this.ipcUtil = new IPCUtil(conf);
1953
1954
1955
1956 responder = new Responder();
1957 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1958 this.userProvider = UserProvider.instantiate(conf);
1959 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
1960 if (isSecurityEnabled) {
1961 HBaseSaslRpcServer.init(conf);
1962 }
1963 this.scheduler = scheduler;
1964 this.scheduler.init(new RpcSchedulerContext(this));
1965 }
1966
1967
1968
1969
1970
1971 protected Connection getConnection(SocketChannel channel, long time) {
1972 return new Connection(channel, time);
1973 }
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1985 throws IOException {
1986 if (response != null) response.reset();
1987 call.setResponse(null, null, t, error);
1988 }
1989
1990 protected void closeConnection(Connection connection) {
1991 synchronized (connectionList) {
1992 if (connectionList.remove(connection)) {
1993 numConnections--;
1994 }
1995 }
1996 connection.close();
1997 }
1998
1999 Configuration getConf() {
2000 return conf;
2001 }
2002
2003
2004
2005
2006 @Override
2007 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2008
2009
2010 @Override
2011 public void start() {
2012 startThreads();
2013 openServer();
2014 }
2015
2016
2017
2018
2019 @Override
2020 public void openServer() {
2021 this.started = true;
2022 }
2023
2024 @Override
2025 public boolean isStarted() {
2026 return this.started;
2027 }
2028
2029
2030
2031
2032
2033 @Override
2034 public synchronized void startThreads() {
2035 AuthenticationTokenSecretManager mgr = createSecretManager();
2036 if (mgr != null) {
2037 setSecretManager(mgr);
2038 mgr.start();
2039 }
2040 this.authManager = new ServiceAuthorizationManager();
2041 HBasePolicyProvider.init(conf, authManager);
2042 responder.start();
2043 listener.start();
2044 scheduler.start();
2045 }
2046
2047 @Override
2048 public void refreshAuthManager(PolicyProvider pp) {
2049
2050
2051 this.authManager.refresh(this.conf, pp);
2052 }
2053
2054 private AuthenticationTokenSecretManager createSecretManager() {
2055 if (!isSecurityEnabled) return null;
2056 if (serverInstance == null) return null;
2057 if (!(serverInstance instanceof org.apache.hadoop.hbase.Server)) return null;
2058 org.apache.hadoop.hbase.Server server = (org.apache.hadoop.hbase.Server)serverInstance;
2059 Configuration conf = server.getConfiguration();
2060 long keyUpdateInterval =
2061 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2062 long maxAge =
2063 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2064 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2065 server.getServerName().toString(), keyUpdateInterval, maxAge);
2066 }
2067
2068 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2069 return this.secretManager;
2070 }
2071
2072 @SuppressWarnings("unchecked")
2073 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2074 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2075 }
2076
2077
2078
2079
2080
2081
2082 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2083 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2084 throws IOException {
2085 try {
2086 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2087
2088 status.setRPCPacket(param);
2089 status.resume("Servicing call");
2090
2091 long startTime = System.currentTimeMillis();
2092 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2093 Message result = service.callBlockingMethod(md, controller, param);
2094 int processingTime = (int) (System.currentTimeMillis() - startTime);
2095 int qTime = (int) (startTime - receiveTime);
2096 if (LOG.isTraceEnabled()) {
2097 LOG.trace(CurCall.get().toString() +
2098 ", response " + TextFormat.shortDebugString(result) +
2099 " queueTime: " + qTime +
2100 " processingTime: " + processingTime);
2101 }
2102 metrics.dequeuedCall(qTime);
2103 metrics.processedCall(processingTime);
2104 long responseSize = result.getSerializedSize();
2105
2106
2107 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2108 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2109 if (tooSlow || tooLarge) {
2110
2111
2112 StringBuilder buffer = new StringBuilder(256);
2113 buffer.append(md.getName());
2114 buffer.append("(");
2115 buffer.append(param.getClass().getName());
2116 buffer.append(")");
2117 logResponse(new Object[]{param},
2118 md.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
2119 status.getClient(), startTime, processingTime, qTime,
2120 responseSize);
2121 }
2122 return new Pair<Message, CellScanner>(result,
2123 controller != null? controller.cellScanner(): null);
2124 } catch (Throwable e) {
2125
2126
2127
2128 if (e instanceof ServiceException) e = e.getCause();
2129 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2130 if (e instanceof IOException) throw (IOException)e;
2131 LOG.error("Unexpected throwable object ", e);
2132 throw new IOException(e.getMessage(), e);
2133 }
2134 }
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150 void logResponse(Object[] params, String methodName, String call, String tag,
2151 String clientAddress, long startTime, int processingTime, int qTime,
2152 long responseSize)
2153 throws IOException {
2154
2155 Map<String, Object> responseInfo = new HashMap<String, Object>();
2156 responseInfo.put("starttimems", startTime);
2157 responseInfo.put("processingtimems", processingTime);
2158 responseInfo.put("queuetimems", qTime);
2159 responseInfo.put("responsesize", responseSize);
2160 responseInfo.put("client", clientAddress);
2161 responseInfo.put("class", serverInstance == null? "": serverInstance.getClass().getSimpleName());
2162 responseInfo.put("method", methodName);
2163 if (params.length == 2 && serverInstance instanceof HRegionServer &&
2164 params[0] instanceof byte[] &&
2165 params[1] instanceof Operation) {
2166
2167
2168 TableName tableName = TableName.valueOf(
2169 HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2170 responseInfo.put("table", tableName.getNameAsString());
2171
2172 responseInfo.putAll(((Operation) params[1]).toMap());
2173
2174 LOG.warn("(operation" + tag + "): " +
2175 MAPPER.writeValueAsString(responseInfo));
2176 } else if (params.length == 1 && serverInstance instanceof HRegionServer &&
2177 params[0] instanceof Operation) {
2178
2179 responseInfo.putAll(((Operation) params[0]).toMap());
2180
2181 LOG.warn("(operation" + tag + "): " +
2182 MAPPER.writeValueAsString(responseInfo));
2183 } else {
2184
2185
2186 responseInfo.put("call", call);
2187 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2188 }
2189 }
2190
2191
2192 @Override
2193 public synchronized void stop() {
2194 LOG.info("Stopping server on " + port);
2195 running = false;
2196 listener.interrupt();
2197 listener.doStop();
2198 responder.interrupt();
2199 scheduler.stop();
2200 notifyAll();
2201 }
2202
2203
2204
2205
2206
2207
2208 @Override
2209 public synchronized void join() throws InterruptedException {
2210 while (running) {
2211 wait();
2212 }
2213 }
2214
2215
2216
2217
2218
2219 @Override
2220 public synchronized InetSocketAddress getListenerAddress() {
2221 return listener.getAddress();
2222 }
2223
2224
2225
2226
2227
2228 @Override
2229 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2230 this.errorHandler = handler;
2231 }
2232
2233 @Override
2234 public HBaseRPCErrorHandler getErrorHandler() {
2235 return this.errorHandler;
2236 }
2237
2238
2239
2240
2241 public MetricsHBaseServer getMetrics() {
2242 return metrics;
2243 }
2244
2245 @Override
2246 public void addCallSize(final long diff) {
2247 this.callQueueSize.add(diff);
2248 }
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258 @SuppressWarnings("static-access")
2259 public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2260 throws AuthorizationException {
2261 if (authorize) {
2262 Class<?> c = getServiceInterface(services, connection.getServiceName());
2263 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2264 }
2265 }
2266
2267
2268
2269
2270
2271
2272 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2289 throws IOException {
2290 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
2291 if (count > 0) this.metrics.sentBytes(count);
2292 return count;
2293 }
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307 protected int channelRead(ReadableByteChannel channel,
2308 ByteBuffer buffer) throws IOException {
2309
2310 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2311 channel.read(buffer) : channelIO(channel, null, buffer);
2312 if (count > 0) {
2313 metrics.receivedBytes(count);
2314 }
2315 return count;
2316 }
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331 private static int channelIO(ReadableByteChannel readCh,
2332 WritableByteChannel writeCh,
2333 ByteBuffer buf) throws IOException {
2334
2335 int originalLimit = buf.limit();
2336 int initialRemaining = buf.remaining();
2337 int ret = 0;
2338
2339 while (buf.remaining() > 0) {
2340 try {
2341 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2342 buf.limit(buf.position() + ioSize);
2343
2344 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2345
2346 if (ret < ioSize) {
2347 break;
2348 }
2349
2350 } finally {
2351 buf.limit(originalLimit);
2352 }
2353 }
2354
2355 int nBytes = initialRemaining - buf.remaining();
2356 return (nBytes > 0) ? nBytes : ret;
2357 }
2358
2359
2360
2361
2362
2363
2364
2365 public static RpcCallContext getCurrentCall() {
2366 return CurCall.get();
2367 }
2368
2369
2370
2371
2372
2373
2374 public static User getRequestUser() {
2375 RpcCallContext ctx = getCurrentCall();
2376 return ctx == null? null: ctx.getRequestUser();
2377 }
2378
2379
2380
2381
2382
2383 public static String getRequestUserName() {
2384 User user = getRequestUser();
2385 return user == null? null: user.getShortName();
2386 }
2387
2388
2389
2390
2391 public static InetAddress getRemoteAddress() {
2392 RpcCallContext ctx = getCurrentCall();
2393 return ctx == null? null: ctx.getRemoteAddress();
2394 }
2395
2396
2397
2398
2399
2400
2401 static BlockingServiceAndInterface getServiceAndInterface(
2402 final List<BlockingServiceAndInterface> services, final String serviceName) {
2403 for (BlockingServiceAndInterface bs : services) {
2404 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2405 return bs;
2406 }
2407 }
2408 return null;
2409 }
2410
2411
2412
2413
2414
2415
2416 static Class<?> getServiceInterface(
2417 final List<BlockingServiceAndInterface> services,
2418 final String serviceName) {
2419 BlockingServiceAndInterface bsasi =
2420 getServiceAndInterface(services, serviceName);
2421 return bsasi == null? null: bsasi.getServiceInterface();
2422 }
2423
2424
2425
2426
2427
2428
2429 static BlockingService getService(
2430 final List<BlockingServiceAndInterface> services,
2431 final String serviceName) {
2432 BlockingServiceAndInterface bsasi =
2433 getServiceAndInterface(services, serviceName);
2434 return bsasi == null? null: bsasi.getBlockingService();
2435 }
2436
2437
2438
2439
2440
2441 public static InetAddress getRemoteIp() {
2442 Call call = CurCall.get();
2443 if (call != null && call.connection.socket != null) {
2444 return call.connection.socket.getInetAddress();
2445 }
2446 return null;
2447 }
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459 public static void bind(ServerSocket socket, InetSocketAddress address,
2460 int backlog) throws IOException {
2461 try {
2462 socket.bind(address, backlog);
2463 } catch (BindException e) {
2464 BindException bindException =
2465 new BindException("Problem binding to " + address + " : " +
2466 e.getMessage());
2467 bindException.initCause(e);
2468 throw bindException;
2469 } catch (SocketException e) {
2470
2471
2472 if ("Unresolved address".equals(e.getMessage())) {
2473 throw new UnknownHostException("Invalid hostname for server: " +
2474 address.getHostName());
2475 }
2476 throw e;
2477 }
2478 }
2479
2480 public RpcScheduler getScheduler() {
2481 return scheduler;
2482 }
2483 }