1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import java.io.IOException;
22 import java.lang.reflect.UndeclaredThrowableException;
23 import java.security.PrivilegedExceptionAction;
24
25 import com.google.protobuf.ServiceException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.client.HConnection;
35 import org.apache.hadoop.hbase.client.HConnectionManager;
36 import org.apache.hadoop.hbase.client.HTableInterface;
37 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
38 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.security.UserProvider;
42 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44 import org.apache.hadoop.io.Text;
45 import org.apache.hadoop.mapred.JobConf;
46 import org.apache.hadoop.mapreduce.Job;
47 import org.apache.hadoop.security.UserGroupInformation;
48 import org.apache.hadoop.security.token.Token;
49 import org.apache.zookeeper.KeeperException;
50
51
52
53
54 @InterfaceAudience.Public
55 @InterfaceStability.Evolving
56 public class TokenUtil {
57
58 private static Log LOG = LogFactory.getLog(TokenUtil.class);
59
60
61
62
63
64
65
66 @Deprecated
67 public static Token<AuthenticationTokenIdentifier> obtainToken(
68 Configuration conf) throws IOException {
69 HConnection connection = null;
70 try {
71 connection = HConnectionManager.createConnection(conf);
72 return obtainToken(connection);
73 } finally {
74 if (connection != null) {
75 connection.close();
76 }
77 }
78 }
79
80
81
82
83
84
85 public static Token<AuthenticationTokenIdentifier> obtainToken(
86 HConnection conn) throws IOException {
87 HTableInterface meta = null;
88 try {
89 meta = conn.getTable(TableName.META_TABLE_NAME);
90 CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
91 AuthenticationProtos.AuthenticationService.BlockingInterface service =
92 AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
93 AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
94 AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
95
96 return ProtobufUtil.toToken(response.getToken());
97 } catch (ServiceException se) {
98 ProtobufUtil.toIOException(se);
99 } finally {
100 if (meta != null) {
101 meta.close();
102 }
103 }
104
105 return null;
106 }
107
108
109
110
111
112
113 public static Token<AuthenticationTokenIdentifier> obtainToken(
114 final HConnection conn, User user) throws IOException, InterruptedException {
115 return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
116 @Override
117 public Token<AuthenticationTokenIdentifier> run() throws Exception {
118 return obtainToken(conn);
119 }
120 });
121 }
122
123
124 private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
125 throws IOException {
126 return token.getService() != null
127 ? token.getService() : new Text("default");
128 }
129
130
131
132
133
134
135
136
137
138
139 @Deprecated
140 public static void obtainAndCacheToken(final Configuration conf,
141 UserGroupInformation user)
142 throws IOException, InterruptedException {
143 HConnection conn = HConnectionManager.createConnection(conf);
144 try {
145 UserProvider userProvider = UserProvider.instantiate(conf);
146 obtainAndCacheToken(conn, userProvider.create(user));
147 } finally {
148 conn.close();
149 }
150 }
151
152
153
154
155
156
157
158
159
160 public static void obtainAndCacheToken(final HConnection conn,
161 User user)
162 throws IOException, InterruptedException {
163 try {
164 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
165
166 if (token == null) {
167 throw new IOException("No token returned for user " + user.getName());
168 }
169 if (LOG.isDebugEnabled()) {
170 LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
171 user.getName());
172 }
173 user.addToken(token);
174 } catch (IOException ioe) {
175 throw ioe;
176 } catch (InterruptedException ie) {
177 throw ie;
178 } catch (RuntimeException re) {
179 throw re;
180 } catch (Exception e) {
181 throw new UndeclaredThrowableException(e,
182 "Unexpected exception obtaining token for user " + user.getName());
183 }
184 }
185
186
187
188
189
190
191
192
193
194
195
196 @Deprecated
197 public static void obtainTokenForJob(final Configuration conf,
198 UserGroupInformation user, Job job)
199 throws IOException, InterruptedException {
200 HConnection conn = HConnectionManager.createConnection(conf);
201 try {
202 UserProvider userProvider = UserProvider.instantiate(conf);
203 obtainTokenForJob(conn, userProvider.create(user), job);
204 } finally {
205 conn.close();
206 }
207 }
208
209
210
211
212
213
214
215
216
217
218 public static void obtainTokenForJob(final HConnection conn,
219 User user, Job job)
220 throws IOException, InterruptedException {
221 try {
222 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
223
224 if (token == null) {
225 throw new IOException("No token returned for user " + user.getName());
226 }
227 Text clusterId = getClusterId(token);
228 if (LOG.isDebugEnabled()) {
229 LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
230 user.getName() + " on cluster " + clusterId.toString());
231 }
232 job.getCredentials().addToken(clusterId, token);
233 } catch (IOException ioe) {
234 throw ioe;
235 } catch (InterruptedException ie) {
236 throw ie;
237 } catch (RuntimeException re) {
238 throw re;
239 } catch (Exception e) {
240 throw new UndeclaredThrowableException(e,
241 "Unexpected exception obtaining token for user " + user.getName());
242 }
243 }
244
245
246
247
248
249
250
251
252
253
254 @Deprecated
255 public static void obtainTokenForJob(final JobConf job,
256 UserGroupInformation user)
257 throws IOException, InterruptedException {
258 HConnection conn = HConnectionManager.createConnection(job);
259 try {
260 UserProvider userProvider = UserProvider.instantiate(job);
261 obtainTokenForJob(conn, job, userProvider.create(user));
262 } finally {
263 conn.close();
264 }
265 }
266
267
268
269
270
271
272
273
274
275
276 public static void obtainTokenForJob(final HConnection conn, final JobConf job, User user)
277 throws IOException, InterruptedException {
278 try {
279 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
280
281 if (token == null) {
282 throw new IOException("No token returned for user " + user.getName());
283 }
284 Text clusterId = getClusterId(token);
285 if (LOG.isDebugEnabled()) {
286 LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
287 user.getName() + " on cluster " + clusterId.toString());
288 }
289 job.getCredentials().addToken(clusterId, token);
290 } catch (IOException ioe) {
291 throw ioe;
292 } catch (InterruptedException ie) {
293 throw ie;
294 } catch (RuntimeException re) {
295 throw re;
296 } catch (Exception e) {
297 throw new UndeclaredThrowableException(e,
298 "Unexpected exception obtaining token for user "+user.getName());
299 }
300 }
301
302
303
304
305
306
307
308
309
310
311
312 public static void addTokenForJob(final HConnection conn, final JobConf job, User user)
313 throws IOException, InterruptedException {
314
315 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
316 if (token == null) {
317 token = obtainToken(conn, user);
318 }
319 job.getCredentials().addToken(token.getService(), token);
320 }
321
322
323
324
325
326
327
328
329
330
331
332 public static void addTokenForJob(final HConnection conn, User user, Job job)
333 throws IOException, InterruptedException {
334 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
335 if (token == null) {
336 token = obtainToken(conn, user);
337 }
338 job.getCredentials().addToken(token.getService(), token);
339 }
340
341
342
343
344
345
346
347
348
349
350
351 public static boolean addTokenIfMissing(HConnection conn, User user)
352 throws IOException, InterruptedException {
353 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
354 if (token == null) {
355 token = obtainToken(conn, user);
356 user.getUGI().addToken(token);
357 return true;
358 }
359 return false;
360 }
361
362
363
364
365
366 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
367 throws IOException, InterruptedException {
368 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
369 try {
370 String clusterId = ZKClusterId.readClusterIdZNode(zkw);
371 if (clusterId == null) {
372 throw new IOException("Failed to get cluster ID");
373 }
374 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
375 } catch (KeeperException e) {
376 throw new IOException(e);
377 } finally {
378 zkw.close();
379 }
380 }
381 }