001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.hadoop.io.Text;
021import org.apache.hadoop.mapred.JobClient;
022import org.apache.hadoop.mapred.JobConf;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027import org.apache.hadoop.net.NetUtils;
028import org.apache.hadoop.security.SecurityUtil;
029import org.apache.hadoop.security.UserGroupInformation;
030import org.apache.hadoop.filecache.DistributedCache;
031import org.apache.hadoop.security.token.Token;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.action.hadoop.JavaActionExecutor;
034import org.apache.oozie.util.ParamChecker;
035import org.apache.oozie.util.XConfiguration;
036import org.apache.oozie.util.XLog;
037
038import java.io.File;
039import java.io.FileInputStream;
040import java.io.IOException;
041import java.io.InputStream;
042import java.net.URI;
043import java.net.URISyntaxException;
044import java.security.PrivilegedExceptionAction;
045import java.util.HashMap;
046import java.util.Map;
047import java.util.Set;
048import java.util.HashSet;
049import java.util.concurrent.ConcurrentHashMap;
050
051/**
052 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
053 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
054 * create/obtain JobClient and FileSystem instances.
055 */
056public class HadoopAccessorService implements Service {
057
058    private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
059
060    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
061    public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
062    public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
063    public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
064    public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
065    public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
066    public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
067    public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
068    public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
069
070    protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
071    /** The Kerberos principal for the job tracker.*/
072    protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
073    /** The Kerberos principal for the resource manager.*/
074    protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
075    protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
076    protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
077    protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
078    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
079
080    private Set<String> jobTrackerWhitelist = new HashSet<String>();
081    private Set<String> nameNodeWhitelist = new HashSet<String>();
082    private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
083    private Map<String, File> actionConfigDirs = new HashMap<String, File>();
084    private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
085
086    private UserGroupInformationService ugiService;
087
088    /**
089     * Supported filesystem schemes for namespace federation
090     */
091    public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
092    public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"};
093    private Set<String> supportedSchemes;
094    private boolean allSchemesSupported;
095
096    public void init(Services services) throws ServiceException {
097        this.ugiService = services.get(UserGroupInformationService.class);
098        init(services.getConf());
099    }
100
101    //for testing purposes, see XFsTestCase
102    public void init(Configuration conf) throws ServiceException {
103        for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
104            String tmp = name.toLowerCase().trim();
105            if (tmp.length() == 0) {
106                continue;
107            }
108            jobTrackerWhitelist.add(tmp);
109        }
110        XLog.getLog(getClass()).info(
111                "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
112                        + ", Total entries :" + jobTrackerWhitelist.size());
113        for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
114            String tmp = name.toLowerCase().trim();
115            if (tmp.length() == 0) {
116                continue;
117            }
118            nameNodeWhitelist.add(tmp);
119        }
120        XLog.getLog(getClass()).info(
121                "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
122                        + ", Total entries :" + nameNodeWhitelist.size());
123
124        boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
125        XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
126        if (kerberosAuthOn) {
127            kerberosInit(conf);
128        }
129        else {
130            Configuration ugiConf = new Configuration();
131            ugiConf.set("hadoop.security.authentication", "simple");
132            UserGroupInformation.setConfiguration(ugiConf);
133        }
134
135        if (ugiService == null) { //for testing purposes, see XFsTestCase
136            this.ugiService = new UserGroupInformationService();
137        }
138
139        loadHadoopConfigs(conf);
140        preLoadActionConfigs(conf);
141
142        supportedSchemes = new HashSet<String>();
143        String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES);
144        if(schemesFromConf != null) {
145            for (String scheme: schemesFromConf) {
146                scheme = scheme.trim();
147                // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
148                if(scheme.equals("*")) {
149                    if(schemesFromConf.length > 1) {
150                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
151                            SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
152                    }
153                    allSchemesSupported = true;
154                }
155                supportedSchemes.add(scheme);
156            }
157        }
158    }
159
160    private void kerberosInit(Configuration serviceConf) throws ServiceException {
161            try {
162                String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
163                                                    System.getProperty("user.home") + "/oozie.keytab").trim();
164                if (keytabFile.length() == 0) {
165                    throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
166                }
167                String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
168                if (principal.length() == 0) {
169                    throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
170                }
171                Configuration conf = new Configuration();
172                conf.set("hadoop.security.authentication", "kerberos");
173                UserGroupInformation.setConfiguration(conf);
174                UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
175                XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
176                                             keytabFile, principal);
177            }
178            catch (ServiceException ex) {
179                throw ex;
180            }
181            catch (Exception ex) {
182                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
183            }
184    }
185
186    private static final String[] HADOOP_CONF_FILES =
187        {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
188
189
190    private Configuration loadHadoopConf(File dir) throws IOException {
191        Configuration hadoopConf = new XConfiguration();
192        for (String file : HADOOP_CONF_FILES) {
193            File f = new File(dir, file);
194            if (f.exists()) {
195                InputStream is = new FileInputStream(f);
196                Configuration conf = new XConfiguration(is);
197                is.close();
198                XConfiguration.copy(conf, hadoopConf);
199            }
200        }
201        return hadoopConf;
202    }
203
204    private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
205        Map<String, File> map = new HashMap<String, File>();
206        File configDir = new File(ConfigurationService.getConfigurationDirectory());
207        for (String confDef : confDefs) {
208            if (confDef.trim().length() > 0) {
209                String[] parts = confDef.split("=");
210                if (parts.length == 2) {
211                    String hostPort = parts[0];
212                    String confDir = parts[1];
213                    File dir = new File(confDir);
214                    if (!dir.isAbsolute()) {
215                        dir = new File(configDir, confDir);
216                    }
217                    if (dir.exists()) {
218                        map.put(hostPort.toLowerCase(), dir);
219                    }
220                    else {
221                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
222                                                   "could not find " + type + " configuration directory: " +
223                                                   dir.getAbsolutePath());
224                    }
225                }
226                else {
227                    throw new ServiceException(ErrorCode.E0100, getClass().getName(),
228                                               "Incorrect " + type + " configuration definition: " + confDef);
229                }
230            }
231        }
232        return map;
233    }
234
235    private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
236        try {
237            Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
238            for (Map.Entry<String, File> entry : map.entrySet()) {
239                hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
240            }
241        }
242        catch (ServiceException ex) {
243            throw ex;
244        }
245        catch (Exception ex) {
246            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
247        }
248    }
249
250    private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
251        try {
252            actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
253            for (String hostport : actionConfigDirs.keySet()) {
254                actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
255            }
256        }
257        catch (ServiceException ex) {
258            throw ex;
259        }
260        catch (Exception ex) {
261            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
262        }
263    }
264
265    public void destroy() {
266    }
267
268    public Class<? extends Service> getInterface() {
269        return HadoopAccessorService.class;
270    }
271
272    private UserGroupInformation getUGI(String user) throws IOException {
273        return ugiService.getProxyUser(user);
274    }
275
276    /**
277     * Creates a JobConf using the site configuration for the specified hostname:port.
278     * <p/>
279     * If the specified hostname:port is not defined it falls back to the '*' site
280     * configuration if available. If the '*' site configuration is not available,
281     * the JobConf has all Hadoop defaults.
282     *
283     * @param hostPort hostname:port to lookup Hadoop site configuration.
284     * @return a JobConf with the corresponding site configuration for hostPort.
285     */
286    public JobConf createJobConf(String hostPort) {
287        JobConf jobConf = new JobConf();
288        XConfiguration.copy(getConfiguration(hostPort), jobConf);
289        jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
290        return jobConf;
291    }
292
293    private XConfiguration loadActionConf(String hostPort, String action) {
294        File dir = actionConfigDirs.get(hostPort);
295        XConfiguration actionConf = new XConfiguration();
296        if (dir != null) {
297            File actionConfFile = new File(dir, action + ".xml");
298            if (actionConfFile.exists()) {
299                try {
300                    actionConf = new XConfiguration(new FileInputStream(actionConfFile));
301                }
302                catch (IOException ex) {
303                    XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
304                                                 actionConfFile.getAbsolutePath(), action, hostPort);
305                }
306            }
307        }
308        return actionConf;
309    }
310
311    /**
312     * Returns a Configuration containing any defaults for an action for a particular cluster.
313     * <p/>
314     * This configuration is used as default for the action configuration and enables cluster
315     * level default values per action.
316     *
317     * @param hostPort hostname"port to lookup the action default confiugration.
318     * @param action action name.
319     * @return the default configuration for the action for the specified cluster.
320     */
321    public XConfiguration createActionDefaultConf(String hostPort, String action) {
322        hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
323        Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
324        if (hostPortActionConfigs == null) {
325            hostPortActionConfigs = actionConfigs.get("*");
326            hostPort = "*";
327        }
328        XConfiguration actionConf = hostPortActionConfigs.get(action);
329        if (actionConf == null) {
330            // doing lazy loading as we don't know upfront all actions, no need to synchronize
331            // as it is a read operation an in case of a race condition loading and inserting
332            // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
333            actionConf = loadActionConf(hostPort, action);
334            hostPortActionConfigs.put(action, actionConf);
335        }
336        return new XConfiguration(actionConf.toProperties());
337    }
338
339    private Configuration getConfiguration(String hostPort) {
340        hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
341        Configuration conf = hadoopConfigs.get(hostPort);
342        if (conf == null) {
343            conf = hadoopConfigs.get("*");
344            if (conf == null) {
345                conf = new XConfiguration();
346            }
347        }
348        return conf;
349    }
350
351    /**
352     * Return a JobClient created with the provided user/group.
353     *
354     *
355     * @param conf JobConf with all necessary information to create the
356     *        JobClient.
357     * @return JobClient created with the provided user/group.
358     * @throws HadoopAccessorException if the client could not be created.
359     */
360    public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
361        ParamChecker.notEmpty(user, "user");
362        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
363            throw new HadoopAccessorException(ErrorCode.E0903);
364        }
365        String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
366        validateJobTracker(jobTracker);
367        try {
368            UserGroupInformation ugi = getUGI(user);
369            JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
370                public JobClient run() throws Exception {
371                    return new JobClient(conf);
372                }
373            });
374            Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
375            conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
376            return jobClient;
377        }
378        catch (InterruptedException ex) {
379            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
380        }
381        catch (IOException ex) {
382            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
383        }
384    }
385
386    /**
387     * Return a FileSystem created with the provided user for the specified URI.
388     *
389     *
390     * @param uri file system URI.
391     * @param conf Configuration with all necessary information to create the FileSystem.
392     * @return FileSystem created with the provided user/group.
393     * @throws HadoopAccessorException if the filesystem could not be created.
394     */
395    public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
396            throws HadoopAccessorException {
397        ParamChecker.notEmpty(user, "user");
398        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
399            throw new HadoopAccessorException(ErrorCode.E0903);
400        }
401
402        checkSupportedFilesystem(uri);
403
404        String nameNode = uri.getAuthority();
405        if (nameNode == null) {
406            nameNode = conf.get("fs.default.name");
407            if (nameNode != null) {
408                try {
409                    nameNode = new URI(nameNode).getAuthority();
410                }
411                catch (URISyntaxException ex) {
412                    throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
413                }
414            }
415        }
416        validateNameNode(nameNode);
417
418        try {
419            UserGroupInformation ugi = getUGI(user);
420            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
421                public FileSystem run() throws Exception {
422                    return FileSystem.get(uri, conf);
423                }
424            });
425        }
426        catch (InterruptedException ex) {
427            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
428        }
429        catch (IOException ex) {
430            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
431        }
432    }
433
434    /**
435     * Validate Job tracker
436     * @param jobTrackerUri
437     * @throws HadoopAccessorException
438     */
439    protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
440        validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
441    }
442
443    /**
444     * Validate Namenode list
445     * @param nameNodeUri
446     * @throws HadoopAccessorException
447     */
448    protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
449        validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
450    }
451
452    private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
453        if (uri != null) {
454            uri = uri.toLowerCase().trim();
455            if (whitelist.size() > 0 && !whitelist.contains(uri)) {
456                throw new HadoopAccessorException(error, uri);
457            }
458        }
459    }
460
461    public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
462        if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
463            return getMRTokenRenewerInternal(jobConf);
464        }
465        else {
466            return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
467        }
468    }
469
470    // Package private for unit test purposes
471    Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
472        // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
473        // support for renewing/cancelling tokens
474        String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
475        Text renewer;
476        if (servicePrincipal != null) { // secure cluster
477            renewer = mrTokenRenewers.get(servicePrincipal);
478            if (renewer == null) {
479                // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
480                String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
481                if (target == null) {
482                    target = jobConf.get(HADOOP_JOB_TRACKER);
483                }
484                String addr = NetUtils.createSocketAddr(target).getHostName();
485                renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
486                LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
487                        + ",Renewer=" + renewer);
488                mrTokenRenewers.put(servicePrincipal, renewer);
489            }
490        }
491        else {
492            renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
493        }
494        return renewer;
495    }
496
497    public void addFileToClassPath(String user, final Path file, final Configuration conf)
498            throws IOException {
499        ParamChecker.notEmpty(user, "user");
500        try {
501            UserGroupInformation ugi = getUGI(user);
502            ugi.doAs(new PrivilegedExceptionAction<Void>() {
503                public Void run() throws Exception {
504                    Configuration defaultConf = new Configuration();
505                    XConfiguration.copy(conf, defaultConf);
506                    //Doing this NOP add first to have the FS created and cached
507                    DistributedCache.addFileToClassPath(file, defaultConf);
508
509                    DistributedCache.addFileToClassPath(file, conf);
510                    return null;
511                }
512            });
513
514        }
515        catch (InterruptedException ex) {
516            throw new IOException(ex);
517        }
518
519    }
520
521    /**
522     * checks configuration parameter if filesystem scheme is among the list of supported ones
523     * this makes system robust to filesystems other than HDFS also
524     */
525
526    public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
527        if (allSchemesSupported)
528            return;
529        String uriScheme = uri.getScheme();
530        if (uriScheme != null) {    // skip the check if no scheme is given
531            if(!supportedSchemes.isEmpty()) {
532                XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
533                if (!supportedSchemes.contains(uriScheme)) {
534                    throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
535                }
536             }
537         }
538    }
539
540    public Set<String> getSupportedSchemes() {
541        return supportedSchemes;
542    }
543
544}