View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.token;
20  
21  import java.io.IOException;
22  import java.lang.reflect.UndeclaredThrowableException;
23  import java.security.PrivilegedExceptionAction;
24  
25  import com.google.protobuf.ServiceException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.client.HConnection;
35  import org.apache.hadoop.hbase.client.HConnectionManager;
36  import org.apache.hadoop.hbase.client.HTableInterface;
37  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
38  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
40  import org.apache.hadoop.hbase.security.User;
41  import org.apache.hadoop.hbase.security.UserProvider;
42  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44  import org.apache.hadoop.io.Text;
45  import org.apache.hadoop.mapred.JobConf;
46  import org.apache.hadoop.mapreduce.Job;
47  import org.apache.hadoop.security.UserGroupInformation;
48  import org.apache.hadoop.security.token.Token;
49  import org.apache.zookeeper.KeeperException;
50  
51  /**
52   * Utility methods for obtaining authentication tokens.
53   */
54  @InterfaceAudience.Public
55  @InterfaceStability.Evolving
56  public class TokenUtil {
57    // This class is referenced indirectly by User out in common; instances are created by reflection
58    private static Log LOG = LogFactory.getLog(TokenUtil.class);
59  
60    /**
61     * Obtain and return an authentication token for the current user.
62     * @param conf the configuration for connecting to the cluster
63     * @return the authentication token instance
64     * @deprecated Replaced by {@link #obtainToken(HConnection)}
65     */
66    @Deprecated
67    public static Token<AuthenticationTokenIdentifier> obtainToken(
68        Configuration conf) throws IOException {
69      HConnection connection = null;
70      try {
71        connection = HConnectionManager.createConnection(conf);
72        return obtainToken(connection);
73      } finally {
74        if (connection != null) {
75          connection.close();
76        }
77      }
78    }
79  
80    /**
81     * Obtain and return an authentication token for the current user.
82     * @param conn The HBase cluster connection
83     * @return the authentication token instance
84     */
85    public static Token<AuthenticationTokenIdentifier> obtainToken(
86        HConnection conn) throws IOException {
87      HTableInterface meta = null;
88      try {
89        meta = conn.getTable(TableName.META_TABLE_NAME);
90        CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
91        AuthenticationProtos.AuthenticationService.BlockingInterface service =
92            AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
93        AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
94            AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
95  
96        return ProtobufUtil.toToken(response.getToken());
97      } catch (ServiceException se) {
98        ProtobufUtil.toIOException(se);
99      } finally {
100       if (meta != null) {
101         meta.close();
102       }
103     }
104     // dummy return for ServiceException block
105     return null;
106   }
107 
108   /**
109    * Obtain and return an authentication token for the current user.
110    * @param conn The HBase cluster connection
111    * @return the authentication token instance
112    */
113   public static Token<AuthenticationTokenIdentifier> obtainToken(
114       final HConnection conn, User user) throws IOException, InterruptedException {
115     return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
116       @Override
117       public Token<AuthenticationTokenIdentifier> run() throws Exception {
118         return obtainToken(conn);
119       }
120     });
121   }
122 
123 
124   private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
125       throws IOException {
126     return token.getService() != null
127         ? token.getService() : new Text("default");
128   }
129 
130   /**
131    * Obtain an authentication token for the given user and add it to the
132    * user's credentials.
133    * @param conf The configuration for connecting to the cluster
134    * @param user The user for whom to obtain the token
135    * @throws IOException If making a remote call to the authentication service fails
136    * @throws InterruptedException If executing as the given user is interrupted
137    * @deprecated Replaced by {@link #obtainAndCacheToken(HConnection,User)}
138    */
139   @Deprecated
140   public static void obtainAndCacheToken(final Configuration conf,
141                                          UserGroupInformation user)
142       throws IOException, InterruptedException {
143     HConnection conn = HConnectionManager.createConnection(conf);
144     try {
145       UserProvider userProvider = UserProvider.instantiate(conf);
146       obtainAndCacheToken(conn, userProvider.create(user));
147     } finally {
148       conn.close();
149     }
150   }
151 
152   /**
153    * Obtain an authentication token for the given user and add it to the
154    * user's credentials.
155    * @param conn The HBase cluster connection
156    * @param user The user for whom to obtain the token
157    * @throws IOException If making a remote call to the authentication service fails
158    * @throws InterruptedException If executing as the given user is interrupted
159    */
160   public static void obtainAndCacheToken(final HConnection conn,
161       User user)
162       throws IOException, InterruptedException {
163     try {
164       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
165 
166       if (token == null) {
167         throw new IOException("No token returned for user " + user.getName());
168       }
169       if (LOG.isDebugEnabled()) {
170         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
171             user.getName());
172       }
173       user.addToken(token);
174     } catch (IOException ioe) {
175       throw ioe;
176     } catch (InterruptedException ie) {
177       throw ie;
178     } catch (RuntimeException re) {
179       throw re;
180     } catch (Exception e) {
181       throw new UndeclaredThrowableException(e,
182           "Unexpected exception obtaining token for user " + user.getName());
183     }
184   }
185 
186   /**
187    * Obtain an authentication token on behalf of the given user and add it to
188    * the credentials for the given map reduce job.
189    * @param conf The configuration for connecting to the cluster
190    * @param user The user for whom to obtain the token
191    * @param job The job instance in which the token should be stored
192    * @throws IOException If making a remote call to the authentication service fails
193    * @throws InterruptedException If executing as the given user is interrupted
194    * @deprecated Replaced by {@link #obtainTokenForJob(HConnection,User,Job)}
195    */
196   @Deprecated
197   public static void obtainTokenForJob(final Configuration conf,
198                                        UserGroupInformation user, Job job)
199       throws IOException, InterruptedException {
200     HConnection conn = HConnectionManager.createConnection(conf);
201     try {
202       UserProvider userProvider = UserProvider.instantiate(conf);
203       obtainTokenForJob(conn, userProvider.create(user), job);
204     } finally {
205       conn.close();
206     }
207   }
208 
209   /**
210    * Obtain an authentication token on behalf of the given user and add it to
211    * the credentials for the given map reduce job.
212    * @param conn The HBase cluster connection
213    * @param user The user for whom to obtain the token
214    * @param job The job instance in which the token should be stored
215    * @throws IOException If making a remote call to the authentication service fails
216    * @throws InterruptedException If executing as the given user is interrupted
217    */
218   public static void obtainTokenForJob(final HConnection conn,
219       User user, Job job)
220       throws IOException, InterruptedException {
221     try {
222       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
223 
224       if (token == null) {
225         throw new IOException("No token returned for user " + user.getName());
226       }
227       Text clusterId = getClusterId(token);
228       if (LOG.isDebugEnabled()) {
229         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
230             user.getName() + " on cluster " + clusterId.toString());
231       }
232       job.getCredentials().addToken(clusterId, token);
233     } catch (IOException ioe) {
234       throw ioe;
235     } catch (InterruptedException ie) {
236       throw ie;
237     } catch (RuntimeException re) {
238       throw re;
239     } catch (Exception e) {
240       throw new UndeclaredThrowableException(e,
241           "Unexpected exception obtaining token for user " + user.getName());
242     }
243   }
244 
245   /**
246    * Obtain an authentication token on behalf of the given user and add it to
247    * the credentials for the given map reduce job.
248    * @param user The user for whom to obtain the token
249    * @param job The job configuration in which the token should be stored
250    * @throws IOException If making a remote call to the authentication service fails
251    * @throws InterruptedException If executing as the given user is interrupted
252    * @deprecated Replaced by {@link #obtainTokenForJob(HConnection,JobConf,User)}
253    */
254   @Deprecated
255   public static void obtainTokenForJob(final JobConf job,
256                                        UserGroupInformation user)
257       throws IOException, InterruptedException {
258     HConnection conn = HConnectionManager.createConnection(job);
259     try {
260       UserProvider userProvider = UserProvider.instantiate(job);
261       obtainTokenForJob(conn, job, userProvider.create(user));
262     } finally {
263       conn.close();
264     }
265   }
266 
267   /**
268    * Obtain an authentication token on behalf of the given user and add it to
269    * the credentials for the given map reduce job.
270    * @param conn The HBase cluster connection
271    * @param user The user for whom to obtain the token
272    * @param job The job configuration in which the token should be stored
273    * @throws IOException If making a remote call to the authentication service fails
274    * @throws InterruptedException If executing as the given user is interrupted
275    */
276   public static void obtainTokenForJob(final HConnection conn, final JobConf job, User user)
277       throws IOException, InterruptedException {
278     try {
279       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
280 
281       if (token == null) {
282         throw new IOException("No token returned for user " + user.getName());
283       }
284       Text clusterId = getClusterId(token);
285       if (LOG.isDebugEnabled()) {
286         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
287             user.getName() + " on cluster " + clusterId.toString());
288       }
289       job.getCredentials().addToken(clusterId, token);
290     } catch (IOException ioe) {
291       throw ioe;
292     } catch (InterruptedException ie) {
293       throw ie;
294     } catch (RuntimeException re) {
295       throw re;
296     } catch (Exception e) {
297       throw new UndeclaredThrowableException(e,
298           "Unexpected exception obtaining token for user "+user.getName());
299     }
300   }
301 
302   /**
303    * Checks for an authentication token for the given user, obtaining a new token if necessary,
304    * and adds it to the credentials for the given map reduce job.
305    *
306    * @param conn The HBase cluster connection
307    * @param user The user for whom to obtain the token
308    * @param job The job configuration in which the token should be stored
309    * @throws IOException If making a remote call to the authentication service fails
310    * @throws InterruptedException If executing as the given user is interrupted
311    */
312   public static void addTokenForJob(final HConnection conn, final JobConf job, User user)
313       throws IOException, InterruptedException {
314 
315     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
316     if (token == null) {
317       token = obtainToken(conn, user);
318     }
319     job.getCredentials().addToken(token.getService(), token);
320   }
321 
322   /**
323    * Checks for an authentication token for the given user, obtaining a new token if necessary,
324    * and adds it to the credentials for the given map reduce job.
325    *
326    * @param conn The HBase cluster connection
327    * @param user The user for whom to obtain the token
328    * @param job The job instance in which the token should be stored
329    * @throws IOException If making a remote call to the authentication service fails
330    * @throws InterruptedException If executing as the given user is interrupted
331    */
332   public static void addTokenForJob(final HConnection conn, User user, Job job)
333       throws IOException, InterruptedException {
334     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
335     if (token == null) {
336       token = obtainToken(conn, user);
337     }
338     job.getCredentials().addToken(token.getService(), token);
339   }
340 
341   /**
342    * Checks if an authentication tokens exists for the connected cluster,
343    * obtaining one if needed and adding it to the user's credentials.
344    *
345    * @param conn The HBase cluster connection
346    * @param user The user for whom to obtain the token
347    * @throws IOException If making a remote call to the authentication service fails
348    * @throws InterruptedException If executing as the given user is interrupted
349    * @return true if the token was added, false if it already existed
350    */
351   public static boolean addTokenIfMissing(HConnection conn, User user)
352       throws IOException, InterruptedException {
353     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
354     if (token == null) {
355       token = obtainToken(conn, user);
356       user.getUGI().addToken(token);
357       return true;
358     }
359     return false;
360   }
361 
362   /**
363    * Get the authentication token of the user for the cluster specified in the configuration
364    * @return null if the user does not have the token, otherwise the auth token for the cluster.
365    */
366   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
367       throws IOException, InterruptedException {
368     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
369     try {
370       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
371       if (clusterId == null) {
372         throw new IOException("Failed to get cluster ID");
373       }
374       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
375     } catch (KeeperException e) {
376       throw new IOException(e);
377     } finally {
378       zkw.close();
379     }
380   }
381 }