1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21
22 import org.apache.hadoop.hbase.classification.InterfaceAudience;
23 import org.apache.hadoop.hbase.classification.InterfaceStability;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.CellScanner;
26 import org.apache.hadoop.hbase.CoprocessorEnvironment;
27 import org.apache.hadoop.hbase.ServerName;
28 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
29 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
30 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
31 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
32 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
33 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
35 import org.apache.hadoop.hbase.regionserver.HRegionServer;
36 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
37 import org.apache.hadoop.hbase.security.UserProvider;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39 import org.apache.hadoop.hbase.util.Pair;
40
41 import com.google.protobuf.BlockingRpcChannel;
42 import com.google.protobuf.BlockingService;
43 import com.google.protobuf.Descriptors.MethodDescriptor;
44 import com.google.protobuf.Message;
45 import com.google.protobuf.RpcController;
46 import com.google.protobuf.ServiceException;
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public class CoprocessorHConnection extends HConnectionImplementation {
59
60
61
62
63
64
65
66
67
68 public static HConnection getConnectionForEnvironment(CoprocessorEnvironment env)
69 throws IOException {
70
71 if (env instanceof RegionCoprocessorEnvironment) {
72 RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
73 RegionServerServices services = e.getRegionServerServices();
74 if (services instanceof HRegionServer) {
75 return new CoprocessorHConnection((HRegionServer) services);
76 }
77 }
78 return HConnectionManager.createConnection(env.getConfiguration());
79 }
80
81 private final ServerName serverName;
82 private final HRegionServer server;
83
84
85
86
87
88
89
90
91 @Deprecated
92 public CoprocessorHConnection(HConnection delegate, HRegionServer server)
93 throws IOException {
94 this(server);
95 }
96
97
98
99
100
101
102 public CoprocessorHConnection(HRegionServer server) throws IOException {
103 this(server.getConfiguration(), server);
104 }
105
106
107
108
109
110
111
112 public CoprocessorHConnection(Configuration conf, HRegionServer server) throws IOException {
113 super(conf, false, null, UserProvider.instantiate(conf).getCurrent());
114 this.server = server;
115 this.serverName = server.getServerName();
116 }
117
118 @Override
119 public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
120 getClient(ServerName serverName) throws IOException {
121
122 if (!this.serverName.equals(serverName)) {
123 return super.getClient(serverName);
124 }
125
126
127 final BlockingService blocking = ClientService.newReflectiveBlockingService(this.server);
128 final RpcServerInterface rpc = this.server.getRpcServer();
129 final MonitoredRPCHandler status = TaskMonitor.get().createRPCStatus(Thread.currentThread()
130 .getName());
131 status.pause("Setting up server-local call");
132 final long timestamp = EnvironmentEdgeManager.currentTimeMillis();
133 BlockingRpcChannel channel = new BlockingRpcChannel() {
134 @Override
135 public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
136 Message request, Message responsePrototype) throws ServiceException {
137 try {
138 Pair<Message, CellScanner> ret = rpc.call(blocking, method, request, null, timestamp,
139 status);
140 if (ret.getSecond() != null) {
141 PayloadCarryingRpcController rpcc = (PayloadCarryingRpcController) controller;
142 rpcc.setCellScanner(ret.getSecond());
143 }
144 return ret.getFirst();
145 } catch (IOException e) {
146 throw new ServiceException(e);
147 }
148 }
149 };
150 return ClientService.newBlockingStub(channel);
151 }
152 }