001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import java.io.IOException;
021import java.net.URI;
022import java.util.HashMap;
023import java.util.Map;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.client.OozieClient;
030import org.apache.oozie.client.XOozieClient;
031import org.apache.oozie.command.CommandException;
032import org.apache.oozie.service.HadoopAccessorException;
033import org.apache.oozie.service.HadoopAccessorService;
034import org.apache.oozie.service.Services;
035
036/**
037 * Job utilities.
038 */
039public class JobUtils {
040    /**
041     * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after
042     * normalization appPath always points to job's Xml definition file.
043     * <p/>
044     *
045     * @param user user
046     * @param group group
047     * @param conf job configuration.
048     * @throws IOException thrown if normalization can not be done properly.
049     */
050    public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException {
051        if (user == null) {
052            throw new IllegalArgumentException("user cannot be null");
053        }
054
055        if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job;
056            return;
057        }
058
059        String wfPathStr = conf.get(OozieClient.APP_PATH);
060        String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
061        String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
062        String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr);
063
064        FileSystem fs = null;
065        try {
066            URI uri = new Path(appPathStr).toUri();
067            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
068            Configuration fsConf = has.createJobConf(uri.getAuthority());
069            fs = has.createFileSystem(user, uri, fsConf);
070        }
071        catch (HadoopAccessorException ex) {
072            throw new IOException(ex.getMessage());
073        }
074
075        Path appPath = new Path(appPathStr);
076        String normalizedAppPathStr = appPathStr;
077        if (!fs.exists(appPath)) {
078            throw new IOException("Error: " + appPathStr + " does not exist");
079        }
080
081        if (wfPathStr != null) {
082            conf.set(OozieClient.APP_PATH, normalizedAppPathStr);
083        }
084        else if (coordPathStr != null) {
085            conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr);
086        }
087        else if (bundlePathStr != null) {
088            conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr);
089        }
090    }
091
092    /**
093     * This Function will parse the value of the changed values in key value manner. the change value would be
094     * key1=value1;key2=value2
095     *
096     * @param changeValue change value.
097     * @return This returns the hash with hash<[key1,value1],[key2,value2]>
098     * @throws CommandException thrown if changeValue cannot be parsed properly.
099     */
100    public static Map<String, String> parseChangeValue(String changeValue) throws CommandException {
101        if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) {
102            throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null");
103        }
104
105        Map<String, String> map = new HashMap<String, String>();
106
107        String[] tokens = changeValue.split(";");
108        for (String token : tokens) {
109            if (!token.contains("=")) {
110                throw new CommandException(ErrorCode.E1015, changeValue,
111                        "change value must be name=value pair or name=(empty string)");
112            }
113
114            String[] pair = token.split("=");
115            String key = pair[0];
116
117            if (map.containsKey(key)) {
118                throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on "
119                        + key);
120            }
121
122            if (pair.length == 2) {
123                map.put(key, pair[1]);
124            }
125            else if (pair.length == 1) {
126                map.put(key, "");
127            }
128            else {
129                throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key
130                        + " must be name=value pair or name=(empty string)");
131            }
132        }
133
134        return map;
135    }
136}