001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.fs.FileStatus;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.fs.PathFilter;
025import org.apache.hadoop.mapred.JobConf;
026import org.apache.oozie.client.OozieClient;
027import org.apache.oozie.workflow.WorkflowApp;
028import org.apache.oozie.workflow.WorkflowException;
029import org.apache.oozie.util.IOUtils;
030import org.apache.oozie.util.XConfiguration;
031import org.apache.oozie.util.XLog;
032import org.apache.oozie.ErrorCode;
033
034import java.io.IOException;
035import java.io.InputStreamReader;
036import java.io.Reader;
037import java.io.StringWriter;
038import java.net.URI;
039import java.net.URISyntaxException;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.LinkedHashSet;
044import java.util.List;
045import java.util.Map;
046import java.util.Set;
047
048/**
049 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
050 */
051public abstract class WorkflowAppService implements Service {
052
053    public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
054
055    public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
056
057    public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
058
059    public static final String HADOOP_USER = "user.name";
060
061    public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
062
063    public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
064
065    public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
066
067    private Path systemLibPath;
068    private long maxWFLength;
069    private boolean oozieSubWfCPInheritance;
070
071    /**
072     * Initialize the workflow application service.
073     *
074     * @param services services instance.
075     */
076    public void init(Services services) {
077        Configuration conf = services.getConf();
078
079        String path = conf.get(SYSTEM_LIB_PATH, " ");
080        if (path.trim().length() > 0) {
081            systemLibPath = new Path(path.trim());
082        }
083
084        maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
085
086        oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false);
087    }
088
089    /**
090     * Destroy the workflow application service.
091     */
092    public void destroy() {
093    }
094
095    /**
096     * Return the public interface for workflow application service.
097     *
098     * @return {@link WorkflowAppService}.
099     */
100    public Class<? extends Service> getInterface() {
101        return WorkflowAppService.class;
102    }
103
104    /**
105     * Read workflow definition.
106     *
107     *
108     * @param appPath application path.
109     * @param user user name.
110     * @return workflow definition.
111     * @throws WorkflowException thrown if the definition could not be read.
112     */
113    protected String readDefinition(String appPath, String user, Configuration conf)
114            throws WorkflowException {
115        try {
116            URI uri = new URI(appPath);
117            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
118            JobConf jobConf = has.createJobConf(uri.getAuthority());
119            FileSystem fs = has.createFileSystem(user, uri, jobConf);
120
121            // app path could be a directory
122            Path path = new Path(uri.getPath());
123            if (!fs.isFile(path)) {
124                path = new Path(path, "workflow.xml");
125            }
126
127            FileStatus fsStatus = fs.getFileStatus(path);
128            if (fsStatus.getLen() > this.maxWFLength) {
129                throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
130            }
131
132            Reader reader = new InputStreamReader(fs.open(path));
133            StringWriter writer = new StringWriter();
134            IOUtils.copyCharStream(reader, writer);
135            return writer.toString();
136
137        }
138        catch (WorkflowException wfe) {
139            throw wfe;
140        }
141        catch (IOException ex) {
142            throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
143        }
144        catch (URISyntaxException ex) {
145            throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
146        }
147        catch (HadoopAccessorException ex) {
148            throw new WorkflowException(ex);
149        }
150        catch (Exception ex) {
151            throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
152        }
153    }
154
155    /**
156     * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
157     * added to distributed cache. These paths include .jar,.so and the resource file paths.
158     *
159     * @param jobConf job configuration.
160     * @param isWorkflowJob indicates if the job is a workflow job or not.
161     * @return proto configuration.
162     * @throws WorkflowException thrown if the proto action configuration could not be created.
163     */
164    public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob)
165            throws WorkflowException {
166        try {
167            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
168            URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
169
170            Configuration conf = has.createJobConf(uri.getAuthority());
171
172            String user = jobConf.get(OozieClient.USER_NAME);
173            conf.set(OozieClient.USER_NAME, user);
174
175            FileSystem fs = has.createFileSystem(user, uri, conf);
176
177            Path appPath = new Path(uri);
178            XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
179            XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
180
181            Collection<String> filePaths;
182            if (isWorkflowJob) {
183                // app path could be a directory
184                Path path = new Path(uri.getPath());
185                if (!fs.isFile(path)) {
186                    filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
187                } else {
188                    filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
189                }
190            }
191            else {
192                filePaths = new LinkedHashSet<String>();
193            }
194
195            String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
196            if (libPaths != null && libPaths.length > 0) {
197                for (int i = 0; i < libPaths.length; i++) {
198                    if (libPaths[i].trim().length() > 0) {
199                        Path libPath = new Path(libPaths[i].trim());
200                        Collection<String> libFilePaths = getLibFiles(fs, libPath);
201                        filePaths.addAll(libFilePaths);
202                    }
203                }
204            }
205
206            // Check if a subworkflow should inherit the libs from the parent WF
207            // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
208            // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
209            if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
210                // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
211                String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
212                if (parentFilePaths != null && parentFilePaths.length > 0) {
213                    String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
214                    for (int i = 0; i < filePathsNames.length; i++) {
215                        Path p = new Path(filePathsNames[i]);
216                        filePathsNames[i] = p.getName();
217                    }
218                    Arrays.sort(filePathsNames);
219                    List<String> nonDuplicateParentFilePaths = new ArrayList<String>();
220                    for (String parentFilePath : parentFilePaths) {
221                        Path p = new Path(parentFilePath);
222                        if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
223                            nonDuplicateParentFilePaths.add(parentFilePath);
224                        }
225                    }
226                    filePaths.addAll(nonDuplicateParentFilePaths);
227                }
228            }
229
230            conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
231
232            //Add all properties start with 'oozie.'
233            for (Map.Entry<String, String> entry : jobConf) {
234                if (entry.getKey().startsWith("oozie.")) {
235                    String name = entry.getKey();
236                    String value = entry.getValue();
237                    // if property already exists, should not overwrite
238                    if(conf.get(name) == null) {
239                        conf.set(name, value);
240                    }
241                }
242            }
243            XConfiguration retConf = new XConfiguration();
244            XConfiguration.copy(conf, retConf);
245            return retConf;
246        }
247        catch (IOException ex) {
248            throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
249        }
250        catch (URISyntaxException ex) {
251            throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
252        }
253        catch (HadoopAccessorException ex) {
254            throw new WorkflowException(ex);
255        }
256        catch (Exception ex) {
257            throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
258                                        ex.getMessage(), ex);
259        }
260    }
261
262    /**
263     * Parse workflow definition.
264     *
265     * @param jobConf job configuration.
266     * @return workflow application.
267     * @throws WorkflowException thrown if the workflow application could not be parsed.
268     */
269    public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException;
270
271    /**
272     * Parse workflow definition.
273     * @param wfXml workflow.
274     * @param jobConf job configuration
275     * @return workflow application.
276     * @throws WorkflowException thrown if the workflow application could not be parsed.
277     */
278    public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
279
280    /**
281     * Get all library paths.
282     *
283     * @param fs file system object.
284     * @param libPath hdfs library path.
285     * @return list of paths.
286     * @throws IOException thrown if the lib paths could not be obtained.
287     */
288    private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
289        Set<String> libPaths = new LinkedHashSet<String>();
290        if (fs.exists(libPath)) {
291            FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
292
293            for (FileStatus file : files) {
294                libPaths.add(file.getPath().toUri().toString());
295            }
296        }
297        else {
298            XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
299        }
300        return libPaths;
301    }
302
303    /*
304     * Filter class doing no filtering.
305     * We dont need define this class, but seems fs.listStatus() is not working properly without this.
306     * So providing this dummy no filtering Filter class.
307     */
308    private class NoPathFilter implements PathFilter {
309        @Override
310        public boolean accept(Path path) {
311            return true;
312        }
313    }
314
315    /**
316     * Returns Oozie system libpath.
317     *
318     * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
319     */
320    public Path getSystemLibPath() {
321        return systemLibPath;
322    }
323}