1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.DataInputStream;
25 import java.io.DataOutputStream;
26 import java.io.FilterInputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.net.ConnectException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.SocketException;
35 import java.net.SocketTimeoutException;
36 import java.net.UnknownHostException;
37 import java.nio.ByteBuffer;
38 import java.security.PrivilegedExceptionAction;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.LinkedList;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Random;
45 import java.util.concurrent.ConcurrentSkipListMap;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48
49 import javax.net.SocketFactory;
50 import javax.security.sasl.SaslException;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.hbase.classification.InterfaceAudience;
55 import org.apache.hadoop.hbase.classification.InterfaceStability;
56 import org.apache.hadoop.conf.Configuration;
57 import org.apache.hadoop.hbase.CellScanner;
58 import org.apache.hadoop.hbase.DoNotRetryIOException;
59 import org.apache.hadoop.hbase.HBaseIOException;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.codec.Codec;
63 import org.apache.hadoop.hbase.codec.KeyValueCodec;
64 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
65 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
66 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
67 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
68 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
69 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
70 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
71 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
72 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
73 import org.apache.hadoop.hbase.security.AuthMethod;
74 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
75 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
76 import org.apache.hadoop.hbase.security.SecurityInfo;
77 import org.apache.hadoop.hbase.security.User;
78 import org.apache.hadoop.hbase.security.UserProvider;
79 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
80 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81 import org.apache.hadoop.hbase.util.ExceptionUtil;
82 import org.apache.hadoop.hbase.util.Pair;
83 import org.apache.hadoop.hbase.util.PoolMap;
84 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
85 import org.apache.hadoop.io.IOUtils;
86 import org.apache.hadoop.io.Text;
87 import org.apache.hadoop.io.compress.CompressionCodec;
88 import org.apache.hadoop.ipc.RemoteException;
89 import org.apache.hadoop.net.NetUtils;
90 import org.apache.hadoop.security.SecurityUtil;
91 import org.apache.hadoop.security.UserGroupInformation;
92 import org.apache.hadoop.security.token.Token;
93 import org.apache.hadoop.security.token.TokenIdentifier;
94 import org.apache.hadoop.security.token.TokenSelector;
95 import org.cloudera.htrace.Span;
96 import org.cloudera.htrace.Trace;
97
98 import com.google.common.annotations.VisibleForTesting;
99 import com.google.protobuf.BlockingRpcChannel;
100 import com.google.protobuf.Descriptors.MethodDescriptor;
101 import com.google.protobuf.Message;
102 import com.google.protobuf.Message.Builder;
103 import com.google.protobuf.RpcController;
104 import com.google.protobuf.ServiceException;
105 import com.google.protobuf.TextFormat;
106
107
108
109
110
111
112 @InterfaceAudience.Private
113 public class RpcClient {
114
115
116 public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
117 protected final PoolMap<ConnectionId, Connection> connections;
118
119 protected int counter;
120 protected final AtomicBoolean running = new AtomicBoolean(true);
121 final protected Configuration conf;
122 final protected int maxIdleTime;
123
124 final protected int maxRetries;
125 final protected long failureSleep;
126 protected final boolean tcpNoDelay;
127 protected final boolean tcpKeepAlive;
128 protected int pingInterval;
129 protected FailedServers failedServers;
130 private final Codec codec;
131 private final CompressionCodec compressor;
132 private final IPCUtil ipcUtil;
133
134 protected final SocketFactory socketFactory;
135 protected String clusterId;
136 protected final SocketAddress localAddr;
137
138 private final boolean fallbackAllowed;
139 private UserProvider userProvider;
140
141 final public static String PING_INTERVAL_NAME = "hbase.ipc.ping.interval";
142 final public static String SOCKET_TIMEOUT = "hbase.ipc.socket.timeout";
143 final static int DEFAULT_PING_INTERVAL = 60000;
144 final static int DEFAULT_SOCKET_TIMEOUT = 20000;
145 final static int PING_CALL_ID = -1;
146
147 public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
148 public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
149
150 public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
151 "hbase.ipc.client.fallback-to-simple-auth-allowed";
152 public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
153
154
155
156
157
158
159 private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
160 @Override
161 protected Integer initialValue() {
162 return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
163 }
164 };
165
166
167
168
169 static class FailedServers {
170 private final LinkedList<Pair<Long, String>> failedServers = new
171 LinkedList<Pair<Long, java.lang.String>>();
172 private final int recheckServersTimeout;
173
174 FailedServers(Configuration conf) {
175 this.recheckServersTimeout = conf.getInt(
176 FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT);
177 }
178
179
180
181
182 public synchronized void addToFailedServers(InetSocketAddress address) {
183 final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout;
184 failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
185 }
186
187
188
189
190
191
192 public synchronized boolean isFailedServer(final InetSocketAddress address) {
193 if (failedServers.isEmpty()) {
194 return false;
195 }
196
197 final String lookup = address.toString();
198 final long now = EnvironmentEdgeManager.currentTimeMillis();
199
200
201 Iterator<Pair<Long, String>> it = failedServers.iterator();
202 while (it.hasNext()) {
203 Pair<Long, String> cur = it.next();
204 if (cur.getFirst() < now) {
205 it.remove();
206 } else {
207 if (lookup.equals(cur.getSecond())) {
208 return true;
209 }
210 }
211 }
212
213 return false;
214 }
215 }
216
217 @SuppressWarnings("serial")
218 @InterfaceAudience.Public
219 @InterfaceStability.Evolving
220
221 public static class FailedServerException extends HBaseIOException {
222 public FailedServerException(String s) {
223 super(s);
224 }
225 }
226
227
228
229
230
231
232
233
234
235 public static void setPingInterval(Configuration conf, int pingInterval) {
236 conf.setInt(PING_INTERVAL_NAME, pingInterval);
237 }
238
239
240
241
242
243
244
245
246 static int getPingInterval(Configuration conf) {
247 return conf.getInt(PING_INTERVAL_NAME, conf.getInt("ipc.ping.interval", DEFAULT_PING_INTERVAL));
248 }
249
250
251
252
253
254
255 public static void setSocketTimeout(Configuration conf, int socketTimeout) {
256 conf.setInt(SOCKET_TIMEOUT, socketTimeout);
257 }
258
259
260
261
262 static int getSocketTimeout(Configuration conf) {
263 return conf.getInt(SOCKET_TIMEOUT, conf.getInt("ipc.socket.timeout", DEFAULT_SOCKET_TIMEOUT));
264 }
265
266
267 protected class Call {
268 final int id;
269 final Message param;
270
271
272
273
274 CellScanner cells;
275 Message response;
276
277 Message responseDefaultType;
278 IOException error;
279 boolean done;
280 long startTime;
281 final MethodDescriptor md;
282
283 protected Call(final MethodDescriptor md, Message param, final CellScanner cells,
284 final Message responseDefaultType) {
285 this.param = param;
286 this.md = md;
287 this.cells = cells;
288 this.startTime = System.currentTimeMillis();
289 this.responseDefaultType = responseDefaultType;
290 synchronized (RpcClient.this) {
291 this.id = counter++;
292 }
293 }
294
295 @Override
296 public String toString() {
297 return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
298 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
299 }
300
301
302
303 protected synchronized void callComplete() {
304 this.done = true;
305 notify();
306 }
307
308
309
310
311
312
313 public void setException(IOException error) {
314 this.error = error;
315 callComplete();
316 }
317
318
319
320
321
322
323
324
325 public void setResponse(Message response, final CellScanner cells) {
326 this.response = response;
327 this.cells = cells;
328 callComplete();
329 }
330
331 public long getStartTime() {
332 return this.startTime;
333 }
334 }
335
336 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
337 TokenSelector<? extends TokenIdentifier>> tokenHandlers =
338 new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>>();
339 static {
340 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
341 new AuthenticationTokenSelector());
342 }
343
344
345
346
347
348 protected Connection createConnection(ConnectionId remoteId, final Codec codec,
349 final CompressionCodec compressor)
350 throws IOException {
351 return new Connection(remoteId, codec, compressor);
352 }
353
354
355
356
357 protected class Connection extends Thread {
358 private ConnectionHeader header;
359 protected ConnectionId remoteId;
360 protected Socket socket = null;
361 protected DataInputStream in;
362 protected DataOutputStream out;
363 private InetSocketAddress server;
364 private String serverPrincipal;
365 private AuthMethod authMethod;
366 private boolean useSasl;
367 private Token<? extends TokenIdentifier> token;
368 private HBaseSaslRpcClient saslRpcClient;
369 private int reloginMaxBackoff;
370 private final Codec codec;
371 private final CompressionCodec compressor;
372
373
374 protected final ConcurrentSkipListMap<Integer, Call> calls =
375 new ConcurrentSkipListMap<Integer, Call>();
376 protected final AtomicLong lastActivity =
377 new AtomicLong();
378 protected final AtomicBoolean shouldCloseConnection =
379 new AtomicBoolean();
380 protected IOException closeException;
381
382 Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
383 throws IOException {
384 if (remoteId.getAddress().isUnresolved()) {
385 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
386 }
387 this.server = remoteId.getAddress();
388 this.codec = codec;
389 this.compressor = compressor;
390
391 UserGroupInformation ticket = remoteId.getTicket().getUGI();
392 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
393 this.useSasl = userProvider.isHBaseSecurityEnabled();
394 if (useSasl && securityInfo != null) {
395 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
396 if (tokenKind != null) {
397 TokenSelector<? extends TokenIdentifier> tokenSelector =
398 tokenHandlers.get(tokenKind);
399 if (tokenSelector != null) {
400 token = tokenSelector.selectToken(new Text(clusterId),
401 ticket.getTokens());
402 } else if (LOG.isDebugEnabled()) {
403 LOG.debug("No token selector found for type "+tokenKind);
404 }
405 }
406 String serverKey = securityInfo.getServerPrincipal();
407 if (serverKey == null) {
408 throw new IOException(
409 "Can't obtain server Kerberos config key from SecurityInfo");
410 }
411 serverPrincipal = SecurityUtil.getServerPrincipal(
412 conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
413 if (LOG.isDebugEnabled()) {
414 LOG.debug("RPC Server Kerberos principal name for service="
415 + remoteId.getServiceName() + " is " + serverPrincipal);
416 }
417 }
418
419 if (!useSasl) {
420 authMethod = AuthMethod.SIMPLE;
421 } else if (token != null) {
422 authMethod = AuthMethod.DIGEST;
423 } else {
424 authMethod = AuthMethod.KERBEROS;
425 }
426
427 if (LOG.isDebugEnabled()) {
428 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
429 ", sasl=" + useSasl);
430 }
431 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
432 this.remoteId = remoteId;
433
434 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
435 builder.setServiceName(remoteId.getServiceName());
436 UserInformation userInfoPB;
437 if ((userInfoPB = getUserInfo(ticket)) != null) {
438 builder.setUserInfo(userInfoPB);
439 }
440 if (this.codec != null) {
441 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
442 }
443 if (this.compressor != null) {
444 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
445 }
446 this.header = builder.build();
447
448 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
449 remoteId.getAddress().toString() +
450 ((ticket==null)?" from an unknown user": (" from "
451 + ticket.getUserName())));
452 this.setDaemon(true);
453 }
454
455 private UserInformation getUserInfo(UserGroupInformation ugi) {
456 if (ugi == null || authMethod == AuthMethod.DIGEST) {
457
458 return null;
459 }
460 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
461 if (authMethod == AuthMethod.KERBEROS) {
462
463 userInfoPB.setEffectiveUser(ugi.getUserName());
464 } else if (authMethod == AuthMethod.SIMPLE) {
465
466 userInfoPB.setEffectiveUser(ugi.getUserName());
467 if (ugi.getRealUser() != null) {
468 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
469 }
470 }
471 return userInfoPB.build();
472 }
473
474
475 protected void touch() {
476 lastActivity.set(System.currentTimeMillis());
477 }
478
479
480
481
482
483
484
485
486
487 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
488 justification="Notify because new call available for processing")
489 protected synchronized void addCall(Call call) {
490
491
492
493 if (this.shouldCloseConnection.get()) {
494 if (this.closeException == null) {
495 call.setException(new IOException(
496 "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
497 } else {
498 call.setException(this.closeException);
499 }
500 synchronized (call) {
501 call.notifyAll();
502 }
503 } else {
504 calls.put(call.id, call);
505 synchronized (call) {
506 notify();
507 }
508 }
509 }
510
511
512
513
514
515 protected class PingInputStream extends FilterInputStream {
516
517 protected PingInputStream(InputStream in) {
518 super(in);
519 }
520
521
522
523
524
525 private void handleTimeout(SocketTimeoutException e) throws IOException {
526 if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) {
527 throw e;
528 }
529 sendPing();
530 }
531
532
533
534
535
536
537 @Override
538 public int read() throws IOException {
539 do {
540 try {
541 return super.read();
542 } catch (SocketTimeoutException e) {
543 handleTimeout(e);
544 }
545 } while (true);
546 }
547
548
549
550
551
552
553
554 @Override
555 public int read(byte[] buf, int off, int len) throws IOException {
556 do {
557 try {
558 return super.read(buf, off, len);
559 } catch (SocketTimeoutException e) {
560 handleTimeout(e);
561 }
562 } while (true);
563 }
564 }
565
566 protected synchronized void setupConnection() throws IOException {
567 short ioFailures = 0;
568 short timeoutFailures = 0;
569 while (true) {
570 try {
571 this.socket = socketFactory.createSocket();
572 this.socket.setTcpNoDelay(tcpNoDelay);
573 this.socket.setKeepAlive(tcpKeepAlive);
574 if (localAddr != null) {
575 this.socket.bind(localAddr);
576 }
577
578 NetUtils.connect(this.socket, remoteId.getAddress(),
579 getSocketTimeout(conf));
580 if (remoteId.rpcTimeout > 0) {
581 pingInterval = remoteId.rpcTimeout;
582 }
583 this.socket.setSoTimeout(pingInterval);
584 return;
585 } catch (SocketTimeoutException toe) {
586
587
588
589 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
590 } catch (IOException ie) {
591 handleConnectionFailure(ioFailures++, maxRetries, ie);
592 }
593 }
594 }
595
596 protected void closeConnection() {
597 if (socket == null) {
598 return;
599 }
600
601
602 try {
603 if (socket.getOutputStream() != null) {
604 socket.getOutputStream().close();
605 }
606 } catch (IOException ignored) {
607 }
608 try {
609 if (socket.getInputStream() != null) {
610 socket.getInputStream().close();
611 }
612 } catch (IOException ignored) {
613 }
614 try {
615 if (socket.getChannel() != null) {
616 socket.getChannel().close();
617 }
618 } catch (IOException ignored) {
619 }
620 try {
621 socket.close();
622 } catch (IOException e) {
623 LOG.warn("Not able to close a socket", e);
624 }
625
626
627
628 socket = null;
629 }
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
647 throws IOException {
648 closeConnection();
649
650
651 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
652 throw ioe;
653 }
654
655
656 try {
657 Thread.sleep(failureSleep);
658 } catch (InterruptedException ie) {
659 ExceptionUtil.rethrowIfInterrupt(ie);
660 }
661
662 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
663 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
664 " time(s).");
665 }
666
667
668
669
670
671
672
673 protected synchronized boolean waitForWork() {
674 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
675 long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get());
676 if (timeout>0) {
677 try {
678 wait(timeout);
679 } catch (InterruptedException ie) {
680 Thread.currentThread().interrupt();
681 }
682 }
683 }
684
685 if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
686 return true;
687 } else if (shouldCloseConnection.get()) {
688 return false;
689 } else if (calls.isEmpty()) {
690 markClosed(null);
691 return false;
692 } else {
693 markClosed((IOException)new IOException().initCause(
694 new InterruptedException()));
695 return false;
696 }
697 }
698
699 public InetSocketAddress getRemoteAddress() {
700 return remoteId.getAddress();
701 }
702
703
704
705
706 protected synchronized void sendPing() throws IOException {
707
708 long curTime = System.currentTimeMillis();
709 if ( curTime - lastActivity.get() >= pingInterval) {
710 lastActivity.set(curTime);
711
712 synchronized (this.out) {
713 out.writeInt(PING_CALL_ID);
714 out.flush();
715 }
716 }
717 }
718
719 @Override
720 public void run() {
721 if (LOG.isDebugEnabled()) {
722 LOG.debug(getName() + ": starting, connections " + connections.size());
723 }
724
725 try {
726 while (waitForWork()) {
727 readResponse();
728 }
729 } catch (Throwable t) {
730 LOG.warn(getName() + ": unexpected exception receiving call responses", t);
731 markClosed(new IOException("Unexpected exception receiving call responses", t));
732 }
733
734 close();
735
736 if (LOG.isDebugEnabled())
737 LOG.debug(getName() + ": stopped, connections " + connections.size());
738 }
739
740 private synchronized void disposeSasl() {
741 if (saslRpcClient != null) {
742 try {
743 saslRpcClient.dispose();
744 saslRpcClient = null;
745 } catch (IOException ioe) {
746 LOG.error("Error disposing of SASL client", ioe);
747 }
748 }
749 }
750
751 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
752 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
753 UserGroupInformation currentUser =
754 UserGroupInformation.getCurrentUser();
755 UserGroupInformation realUser = currentUser.getRealUser();
756 return authMethod == AuthMethod.KERBEROS &&
757 loginUser != null &&
758
759 loginUser.hasKerberosCredentials() &&
760
761
762 (loginUser.equals(currentUser) || loginUser.equals(realUser));
763 }
764
765 private synchronized boolean setupSaslConnection(final InputStream in2,
766 final OutputStream out2) throws IOException {
767 saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
768 conf.get("hbase.rpc.protection",
769 QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
770 return saslRpcClient.saslConnect(in2, out2);
771 }
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791 private synchronized void handleSaslConnectionFailure(
792 final int currRetries,
793 final int maxRetries, final Exception ex, final Random rand,
794 final UserGroupInformation user)
795 throws IOException, InterruptedException{
796 user.doAs(new PrivilegedExceptionAction<Object>() {
797 public Object run() throws IOException, InterruptedException {
798 closeConnection();
799 if (shouldAuthenticateOverKrb()) {
800 if (currRetries < maxRetries) {
801 LOG.debug("Exception encountered while connecting to " +
802 "the server : " + ex);
803
804 if (UserGroupInformation.isLoginKeytabBased()) {
805 UserGroupInformation.getLoginUser().reloginFromKeytab();
806 } else {
807 UserGroupInformation.getLoginUser().reloginFromTicketCache();
808 }
809 disposeSasl();
810
811
812
813
814 Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
815 return null;
816 } else {
817 String msg = "Couldn't setup connection for " +
818 UserGroupInformation.getLoginUser().getUserName() +
819 " to " + serverPrincipal;
820 LOG.warn(msg);
821 throw (IOException) new IOException(msg).initCause(ex);
822 }
823 } else {
824 LOG.warn("Exception encountered while connecting to " +
825 "the server : " + ex);
826 }
827 if (ex instanceof RemoteException) {
828 throw (RemoteException)ex;
829 }
830 if (ex instanceof SaslException) {
831 String msg = "SASL authentication failed." +
832 " The most likely cause is missing or invalid credentials." +
833 " Consider 'kinit'.";
834 LOG.fatal(msg, ex);
835 throw new RuntimeException(msg, ex);
836 }
837 throw new IOException(ex);
838 }
839 });
840 }
841
842 protected synchronized void setupIOstreams()
843 throws IOException, InterruptedException {
844 if (socket != null || shouldCloseConnection.get()) {
845 return;
846 }
847
848 if (failedServers.isFailedServer(remoteId.getAddress())) {
849 if (LOG.isDebugEnabled()) {
850 LOG.debug("Not trying to connect to " + server +
851 " this server is in the failed servers list");
852 }
853 IOException e = new FailedServerException(
854 "This server is in the failed servers list: " + server);
855 markClosed(e);
856 close();
857 throw e;
858 }
859
860 try {
861 if (LOG.isDebugEnabled()) {
862 LOG.debug("Connecting to " + server);
863 }
864 short numRetries = 0;
865 final short MAX_RETRIES = 5;
866 Random rand = null;
867 while (true) {
868 setupConnection();
869 InputStream inStream = NetUtils.getInputStream(socket);
870
871
872
873 OutputStream outStream = NetUtils.getOutputStream(socket, pingInterval);
874
875 writeConnectionHeaderPreamble(outStream);
876 if (useSasl) {
877 final InputStream in2 = inStream;
878 final OutputStream out2 = outStream;
879 UserGroupInformation ticket = remoteId.getTicket().getUGI();
880 if (authMethod == AuthMethod.KERBEROS) {
881 if (ticket != null && ticket.getRealUser() != null) {
882 ticket = ticket.getRealUser();
883 }
884 }
885 boolean continueSasl = false;
886 if (ticket == null) throw new FatalConnectionException("ticket/user is null");
887 try {
888 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
889 @Override
890 public Boolean run() throws IOException {
891 return setupSaslConnection(in2, out2);
892 }
893 });
894 } catch (Exception ex) {
895 if (rand == null) {
896 rand = new Random();
897 }
898 handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
899 continue;
900 }
901 if (continueSasl) {
902
903 inStream = saslRpcClient.getInputStream(inStream);
904 outStream = saslRpcClient.getOutputStream(outStream);
905 } else {
906
907 authMethod = AuthMethod.SIMPLE;
908 useSasl = false;
909 }
910 }
911 this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
912 this.out = new DataOutputStream(new BufferedOutputStream(outStream));
913
914 writeConnectionHeader();
915
916
917 touch();
918
919
920 start();
921 return;
922 }
923 } catch (Throwable t) {
924 failedServers.addToFailedServers(remoteId.address);
925 IOException e = null;
926 if (t instanceof LinkageError) {
927
928 e = new DoNotRetryIOException(t);
929 markClosed(e);
930 } else if (t instanceof IOException) {
931 e = (IOException)t;
932 markClosed(e);
933 } else {
934 e = new IOException("Could not set up IO Streams", t);
935 markClosed(e);
936 }
937 close();
938 throw e;
939 }
940 }
941
942
943
944
945 private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
946
947
948
949
950
951 int rpcHeaderLen = HConstants.RPC_HEADER.array().length;
952 byte [] preamble = new byte [rpcHeaderLen + 2];
953 System.arraycopy(HConstants.RPC_HEADER.array(), 0, preamble, 0, rpcHeaderLen);
954 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
955 preamble[rpcHeaderLen + 1] = authMethod.code;
956 outStream.write(preamble);
957 outStream.flush();
958 }
959
960
961
962
963
964 private void writeConnectionHeader() throws IOException {
965 synchronized (this.out) {
966 this.out.writeInt(this.header.getSerializedSize());
967 this.header.writeTo(this.out);
968 this.out.flush();
969 }
970 }
971
972
973 protected synchronized void close() {
974 if (!shouldCloseConnection.get()) {
975 LOG.error(getName() + ": the connection is not in the closed state");
976 return;
977 }
978
979
980
981 synchronized (connections) {
982 connections.removeValue(remoteId, this);
983 }
984
985
986 if (this.out != null) {
987 synchronized(this.out) {
988 IOUtils.closeStream(out);
989 this.out = null;
990 }
991 }
992 IOUtils.closeStream(in);
993 this.in = null;
994 disposeSasl();
995
996
997 if (closeException == null) {
998 if (!calls.isEmpty()) {
999 LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
1000 "#Calls: " + calls.size());
1001
1002
1003 closeException = new IOException("Unexpected closed connection");
1004 cleanupCalls();
1005 }
1006 } else {
1007
1008 if (LOG.isDebugEnabled()) {
1009 LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
1010 closeException.getMessage(), closeException);
1011 }
1012
1013
1014 cleanupCalls();
1015 }
1016 if (LOG.isDebugEnabled())
1017 LOG.debug(getName() + ": closed");
1018 }
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028 protected void writeRequest(Call call, final int priority) {
1029 if (shouldCloseConnection.get()) return;
1030 try {
1031 RequestHeader.Builder builder = RequestHeader.newBuilder();
1032 builder.setCallId(call.id);
1033 if (Trace.isTracing()) {
1034 Span s = Trace.currentSpan();
1035 builder.setTraceInfo(RPCTInfo.newBuilder().
1036 setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
1037 }
1038 builder.setMethodName(call.md.getName());
1039 builder.setRequestParam(call.param != null);
1040 ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
1041 if (cellBlock != null) {
1042 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
1043 cellBlockBuilder.setLength(cellBlock.limit());
1044 builder.setCellBlockMeta(cellBlockBuilder.build());
1045 }
1046
1047 if (priority != 0) builder.setPriority(priority);
1048
1049 RequestHeader header = builder.build();
1050 synchronized (this.out) {
1051 IPCUtil.write(this.out, header, call.param, cellBlock);
1052 }
1053 if (LOG.isDebugEnabled()) {
1054 LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
1055 }
1056 } catch(IOException e) {
1057 synchronized (this) {
1058 if (!shouldCloseConnection.get()) {
1059 markClosed(e);
1060 interrupt();
1061 }
1062 }
1063 }
1064 }
1065
1066
1067
1068
1069 protected void readResponse() {
1070 if (shouldCloseConnection.get()) return;
1071 touch();
1072 int totalSize = -1;
1073 try {
1074
1075
1076 totalSize = in.readInt();
1077
1078
1079 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
1080 int id = responseHeader.getCallId();
1081 if (LOG.isDebugEnabled()) {
1082 LOG.debug(getName() + ": got response header " +
1083 TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
1084 }
1085 Call call = calls.get(id);
1086 if (call == null) {
1087
1088
1089
1090
1091
1092 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
1093 int whatIsLeftToRead = totalSize - readSoFar;
1094 LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
1095 whatIsLeftToRead + " bytes");
1096 IOUtils.skipFully(in, whatIsLeftToRead);
1097 }
1098 if (responseHeader.hasException()) {
1099 ExceptionResponse exceptionResponse = responseHeader.getException();
1100 RemoteException re = createRemoteException(exceptionResponse);
1101 if (isFatalConnectionException(exceptionResponse)) {
1102 markClosed(re);
1103 } else {
1104 if (call != null) call.setException(re);
1105 }
1106 } else {
1107 Message value = null;
1108
1109 if (call != null && call.responseDefaultType != null) {
1110 Builder builder = call.responseDefaultType.newBuilderForType();
1111 builder.mergeDelimitedFrom(in);
1112 value = builder.build();
1113 }
1114 CellScanner cellBlockScanner = null;
1115 if (responseHeader.hasCellBlockMeta()) {
1116 int size = responseHeader.getCellBlockMeta().getLength();
1117 byte [] cellBlock = new byte[size];
1118 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
1119 cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
1120 }
1121
1122
1123 if (call != null) call.setResponse(value, cellBlockScanner);
1124 }
1125 if (call != null) calls.remove(id);
1126 } catch (IOException e) {
1127 if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
1128
1129
1130
1131 closeException = e;
1132 } else {
1133
1134 markClosed(e);
1135 }
1136 } finally {
1137 if (remoteId.rpcTimeout > 0) {
1138 cleanupCalls(remoteId.rpcTimeout);
1139 }
1140 }
1141 }
1142
1143
1144
1145
1146
1147 private boolean isFatalConnectionException(final ExceptionResponse e) {
1148 return e.getExceptionClassName().
1149 equals(FatalConnectionException.class.getName());
1150 }
1151
1152
1153
1154
1155
1156 private RemoteException createRemoteException(final ExceptionResponse e) {
1157 String innerExceptionClassName = e.getExceptionClassName();
1158 boolean doNotRetry = e.getDoNotRetry();
1159 return e.hasHostname()?
1160
1161 new RemoteWithExtrasException(innerExceptionClassName,
1162 e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1163 new RemoteWithExtrasException(innerExceptionClassName,
1164 e.getStackTrace(), doNotRetry);
1165 }
1166
1167 protected synchronized void markClosed(IOException e) {
1168 if (shouldCloseConnection.compareAndSet(false, true)) {
1169 closeException = e;
1170 notifyAll();
1171 }
1172 }
1173
1174
1175 protected void cleanupCalls() {
1176 cleanupCalls(0);
1177 }
1178
1179 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
1180 justification="Notify because timedout")
1181 protected void cleanupCalls(long rpcTimeout) {
1182 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1183 while (itor.hasNext()) {
1184 Call c = itor.next().getValue();
1185 long waitTime = System.currentTimeMillis() - c.getStartTime();
1186 if (waitTime >= rpcTimeout) {
1187 if (this.closeException == null) {
1188
1189
1190
1191
1192
1193
1194 this.closeException = new CallTimeoutException("Call id=" + c.id +
1195 ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
1196 }
1197 c.setException(this.closeException);
1198 synchronized (c) {
1199 c.notifyAll();
1200 }
1201 itor.remove();
1202 } else {
1203 break;
1204 }
1205 }
1206 try {
1207 if (!calls.isEmpty()) {
1208 Call firstCall = calls.get(calls.firstKey());
1209 long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
1210 if (maxWaitTime < rpcTimeout) {
1211 rpcTimeout -= maxWaitTime;
1212 }
1213 }
1214 if (!shouldCloseConnection.get()) {
1215 closeException = null;
1216 setSocketTimeout(socket, (int) rpcTimeout);
1217 }
1218 } catch (SocketException e) {
1219 LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
1220 }
1221 }
1222 }
1223
1224 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
1225 justification="Presume sync not needed setting socket timeout")
1226 private static void setSocketTimeout(final Socket socket, final int rpcTimeout)
1227 throws java.net.SocketException {
1228 if (socket == null) return;
1229 socket.setSoTimeout(rpcTimeout);
1230 }
1231
1232
1233
1234
1235 @SuppressWarnings("serial")
1236 @InterfaceAudience.Public
1237 @InterfaceStability.Evolving
1238 public static class CallTimeoutException extends IOException {
1239 public CallTimeoutException(final String msg) {
1240 super(msg);
1241 }
1242 }
1243
1244
1245
1246
1247
1248
1249
1250 RpcClient(Configuration conf, String clusterId, SocketFactory factory) {
1251 this(conf, clusterId, factory, null);
1252 }
1253
1254
1255
1256
1257
1258
1259
1260
1261 RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
1262 this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
1263 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
1264 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1265 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1266 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
1267 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
1268 this.pingInterval = getPingInterval(conf);
1269 this.ipcUtil = new IPCUtil(conf);
1270 this.conf = conf;
1271 this.codec = getCodec();
1272 this.compressor = getCompressor(conf);
1273 this.socketFactory = factory;
1274 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
1275 this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1276 this.failedServers = new FailedServers(conf);
1277 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
1278 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
1279 this.localAddr = localAddr;
1280 this.userProvider = UserProvider.instantiate(conf);
1281
1282 if (LOG.isDebugEnabled()) {
1283 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
1284 ", tcpKeepAlive=" + this.tcpKeepAlive +
1285 ", tcpNoDelay=" + this.tcpNoDelay +
1286 ", maxIdleTime=" + this.maxIdleTime +
1287 ", maxRetries=" + this.maxRetries +
1288 ", fallbackAllowed=" + this.fallbackAllowed +
1289 ", ping interval=" + this.pingInterval + "ms" +
1290 ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
1291 }
1292 }
1293
1294
1295
1296
1297
1298
1299 public RpcClient(Configuration conf, String clusterId) {
1300 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
1301 }
1302
1303
1304
1305
1306
1307
1308
1309 public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
1310 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
1311 }
1312
1313
1314
1315
1316
1317 Codec getCodec() {
1318
1319
1320 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
1321 if (className == null || className.length() == 0) return null;
1322 try {
1323 return (Codec)Class.forName(className).newInstance();
1324 } catch (Exception e) {
1325 throw new RuntimeException("Failed getting codec " + className, e);
1326 }
1327 }
1328
1329 @VisibleForTesting
1330 public static String getDefaultCodec(final Configuration c) {
1331
1332
1333
1334 return c.get("hbase.client.default.rpc.codec", KeyValueCodec.class.getCanonicalName());
1335 }
1336
1337
1338
1339
1340
1341
1342 private static CompressionCodec getCompressor(final Configuration conf) {
1343 String className = conf.get("hbase.client.rpc.compressor", null);
1344 if (className == null || className.isEmpty()) return null;
1345 try {
1346 return (CompressionCodec)Class.forName(className).newInstance();
1347 } catch (Exception e) {
1348 throw new RuntimeException("Failed getting compressor " + className, e);
1349 }
1350 }
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367 protected static PoolType getPoolType(Configuration config) {
1368 return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
1369 PoolType.RoundRobin, PoolType.ThreadLocal);
1370 }
1371
1372
1373
1374
1375
1376
1377
1378
1379 protected static int getPoolSize(Configuration config) {
1380 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
1381 }
1382
1383
1384
1385
1386
1387 SocketFactory getSocketFactory() {
1388 return socketFactory;
1389 }
1390
1391
1392
1393 public void stop() {
1394 if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1395 if (!running.compareAndSet(true, false)) return;
1396
1397
1398 synchronized (connections) {
1399 for (Connection conn : connections.values()) {
1400 conn.interrupt();
1401 }
1402 }
1403
1404
1405 while (!connections.isEmpty()) {
1406 try {
1407 Thread.sleep(100);
1408 } catch (InterruptedException ignored) {
1409 }
1410 }
1411 }
1412
1413 Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
1414 Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
1415 throws InterruptedException, IOException {
1416 return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
1417 }
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437 Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
1438 Message returnType, User ticket, InetSocketAddress addr,
1439 int rpcTimeout, int priority)
1440 throws InterruptedException, IOException {
1441 Call call = new Call(md, param, cells, returnType);
1442 Connection connection =
1443 getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
1444 connection.writeRequest(call, priority);
1445
1446
1447 synchronized (call) {
1448 while (!call.done) {
1449 if (connection.shouldCloseConnection.get()) {
1450 throw new IOException("Unexpected closed connection");
1451 }
1452 call.wait(1000);
1453 }
1454
1455 if (call.error != null) {
1456 if (call.error instanceof RemoteException) {
1457 call.error.fillInStackTrace();
1458 throw call.error;
1459 }
1460
1461 throw wrapException(addr, call.error);
1462 }
1463 return new Pair<Message, CellScanner>(call.response, call.cells);
1464 }
1465 }
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479 protected IOException wrapException(InetSocketAddress addr,
1480 IOException exception) {
1481 if (exception instanceof ConnectException) {
1482
1483 return (ConnectException)new ConnectException(
1484 "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
1485 } else if (exception instanceof SocketTimeoutException) {
1486 return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
1487 " failed because " + exception).initCause(exception);
1488 } else {
1489 return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
1490 exception).initCause(exception);
1491 }
1492 }
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502 public void cancelConnections(String hostname, int port, IOException ioe) {
1503 synchronized (connections) {
1504 for (Connection connection : connections.values()) {
1505 if (connection.isAlive() &&
1506 connection.getRemoteAddress().getPort() == port &&
1507 connection.getRemoteAddress().getHostName().equals(hostname)) {
1508 LOG.info("The server on " + hostname + ":" + port +
1509 " is dead - stopping the connection " + connection.remoteId);
1510 connection.closeConnection();
1511
1512
1513 }
1514 }
1515 }
1516 }
1517
1518
1519
1520 protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
1521 int rpcTimeout, final Codec codec, final CompressionCodec compressor)
1522 throws IOException, InterruptedException {
1523 if (!running.get()) throw new StoppedRpcClientException();
1524 Connection connection;
1525 ConnectionId remoteId =
1526 new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout);
1527 synchronized (connections) {
1528 connection = connections.get(remoteId);
1529 if (connection == null) {
1530 connection = createConnection(remoteId, this.codec, this.compressor);
1531 connections.put(remoteId, connection);
1532 }
1533 }
1534 connection.addCall(call);
1535
1536
1537
1538
1539
1540
1541
1542
1543 connection.setupIOstreams();
1544 return connection;
1545 }
1546
1547
1548
1549
1550
1551 protected static class ConnectionId {
1552 final InetSocketAddress address;
1553 final User ticket;
1554 final int rpcTimeout;
1555 private static final int PRIME = 16777619;
1556 final String serviceName;
1557
1558 ConnectionId(User ticket,
1559 String serviceName,
1560 InetSocketAddress address,
1561 int rpcTimeout) {
1562 this.address = address;
1563 this.ticket = ticket;
1564 this.rpcTimeout = rpcTimeout;
1565 this.serviceName = serviceName;
1566 }
1567
1568 String getServiceName() {
1569 return this.serviceName;
1570 }
1571
1572 InetSocketAddress getAddress() {
1573 return address;
1574 }
1575
1576 User getTicket() {
1577 return ticket;
1578 }
1579
1580 @Override
1581 public String toString() {
1582 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" +
1583 this.rpcTimeout;
1584 }
1585
1586 @Override
1587 public boolean equals(Object obj) {
1588 if (obj instanceof ConnectionId) {
1589 ConnectionId id = (ConnectionId) obj;
1590 return address.equals(id.address) &&
1591 ((ticket != null && ticket.equals(id.ticket)) ||
1592 (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout &&
1593 this.serviceName == id.serviceName;
1594 }
1595 return false;
1596 }
1597
1598 @Override
1599 public int hashCode() {
1600 int hashcode = (address.hashCode() +
1601 PRIME * (PRIME * this.serviceName.hashCode() ^
1602 (ticket == null ? 0 : ticket.hashCode()) )) ^
1603 rpcTimeout;
1604 return hashcode;
1605 }
1606 }
1607
1608 public static void setRpcTimeout(int t) {
1609 rpcTimeout.set(t);
1610 }
1611
1612 public static int getRpcTimeout() {
1613 return rpcTimeout.get();
1614 }
1615
1616
1617
1618
1619
1620 public static int getRpcTimeout(int defaultTimeout) {
1621 return Math.min(defaultTimeout, rpcTimeout.get());
1622 }
1623
1624 public static void resetRpcTimeout() {
1625 rpcTimeout.remove();
1626 }
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644 Message callBlockingMethod(MethodDescriptor md, RpcController controller,
1645 Message param, Message returnType, final User ticket, final InetSocketAddress isa,
1646 final int rpcTimeout)
1647 throws ServiceException {
1648 long startTime = 0;
1649 if (LOG.isTraceEnabled()) {
1650 startTime = System.currentTimeMillis();
1651 }
1652 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
1653 CellScanner cells = null;
1654 if (pcrc != null) {
1655 cells = pcrc.cellScanner();
1656
1657 pcrc.setCellScanner(null);
1658 }
1659 Pair<Message, CellScanner> val = null;
1660 try {
1661 val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
1662 pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
1663 if (pcrc != null) {
1664
1665 if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
1666 } else if (val.getSecond() != null) {
1667 throw new ServiceException("Client dropping data on the floor!");
1668 }
1669
1670 if (LOG.isTraceEnabled()) {
1671 long callTime = System.currentTimeMillis() - startTime;
1672 if (LOG.isTraceEnabled()) {
1673 LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
1674 }
1675 }
1676 return val.getFirst();
1677 } catch (Throwable e) {
1678 throw new ServiceException(e);
1679 }
1680 }
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
1691 final User ticket, final int rpcTimeout) {
1692 return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout);
1693 }
1694
1695
1696
1697
1698
1699 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
1700 private final InetSocketAddress isa;
1701 private volatile RpcClient rpcClient;
1702 private final int rpcTimeout;
1703 private final User ticket;
1704
1705 protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn,
1706 final User ticket, final int rpcTimeout) {
1707 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
1708 this.rpcClient = rpcClient;
1709
1710
1711 this.rpcTimeout = getRpcTimeout(rpcTimeout);
1712 this.ticket = ticket;
1713 }
1714
1715 @Override
1716 public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
1717 Message param, Message returnType)
1718 throws ServiceException {
1719 return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
1720 this.isa, this.rpcTimeout);
1721 }
1722 }
1723 }