001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.IOException;
021import java.io.Writer;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.oozie.client.CoordinatorJob;
025import org.apache.oozie.client.WorkflowJob;
026import org.apache.oozie.executor.jpa.JPAExecutorException;
027import org.apache.oozie.service.JMSTopicService;
028import org.apache.oozie.service.Services;
029
030public abstract class BaseEngine {
031    public static final String USE_XCOMMAND = "oozie.useXCommand";
032
033    protected String user;
034
035    /**
036     * Return the user name.
037     *
038     * @return the user name.
039     */
040    public String getUser() {
041        return user;
042    }
043
044    /**
045     * Submit a job.
046     * <p/>
047     * It validates configuration properties.
048     *
049     * @param conf job configuration.
050     * @param startJob indicates if the job should be started or not.
051     * @return the job Id.
052     * @throws BaseEngineException thrown if the job could not be created.
053     */
054    public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
055
056    /**
057     * Start a job.
058     *
059     * @param jobId job Id.
060     * @throws BaseEngineException thrown if the job could not be started.
061     */
062    public abstract void start(String jobId) throws BaseEngineException;
063
064    /**
065     * Resume a job.
066     *
067     * @param jobId job Id.
068     * @throws BaseEngineException thrown if the job could not be resumed.
069     */
070    public abstract void resume(String jobId) throws BaseEngineException;
071
072    /**
073     * Suspend a job.
074     *
075     * @param jobId job Id.
076     * @throws BaseEngineException thrown if the job could not be suspended.
077     */
078    public abstract void suspend(String jobId) throws BaseEngineException;
079
080    /**
081     * Kill a job.
082     *
083     * @param jobId job Id.
084     * @throws BaseEngineException thrown if the job could not be killed.
085     */
086    public abstract void kill(String jobId) throws BaseEngineException;
087
088    /**
089     * Change a coordinator job.
090     *
091     * @param jobId job Id.
092     * @param changeValue change value.
093     * @throws BaseEngineException thrown if the job could not be changed.
094     */
095    public abstract void change(String jobId, String changeValue) throws BaseEngineException;
096
097    /**
098     * Rerun a job.
099     *
100     * @param jobId job Id to rerun.
101     * @param conf configuration information for the rerun.
102     * @throws BaseEngineException thrown if the job could not be rerun.
103     */
104    public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
105
106    /**
107     * Return the info about a wf job.
108     *
109     * @param jobId job Id.
110     * @return the workflow job info.
111     * @throws DagEngineException thrown if the job info could not be obtained.
112     */
113    public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
114
115    /**
116     * Return the info about a wf job with actions subset.
117     *
118     * @param jobId job Id
119     * @param start starting from this index in the list of actions belonging to the job
120     * @param length number of actions to be returned
121     * @return the workflow job info.
122     * @throws DagEngineException thrown if the job info could not be obtained.
123     */
124    public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
125
126    /**
127     * Return the info about a coord job.
128     *
129     * @param jobId job Id.
130     * @return the coord job info.
131     * @throws BaseEngineException thrown if the job info could not be obtained.
132     */
133    public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
134
135    /**
136     * Return the info about a coord job with actions subset.
137     *
138     * @param jobId job Id.
139     * @param filter the status filter
140     * @param start starting from this index in the list of actions belonging to the job
141     * @param length number of actions to be returned
142     * @param order true if actions are sorted in a descending order of nominal time, false if asc order
143     * @return the coord job info.
144     * @throws BaseEngineException thrown if the job info could not be obtained.
145     */
146    public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
147            throws BaseEngineException;
148
149    /**
150     * Return the a job definition.
151     *
152     * @param jobId job Id.
153     * @return the job definition.
154     * @throws BaseEngineException thrown if the job definition could no be obtained.
155     */
156    public abstract String getDefinition(String jobId) throws BaseEngineException;
157
158    /**
159     * Stream the log of a job.
160     *
161     * @param jobId job Id.
162     * @param writer writer to stream the log to.
163     * @throws IOException thrown if the log cannot be streamed.
164     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
165     *         jobId.
166     */
167    public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException;
168
169    /**
170     * Return the workflow Job ID for an external ID.
171     * <p/>
172     * This is reverse lookup for recovery purposes.
173     *
174     * @param externalId external ID provided at job submission time.
175     * @return the associated workflow job ID if any, <code>null</code> if none.
176     * @throws BaseEngineException thrown if the lookup could not be done.
177     */
178    public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
179
180    /**
181     * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute
182     * the job.
183     * <p/>
184     * It validates configuration properties.
185     *
186     * @param conf job configuration.
187     * @return the result of the dryrun
188     * @throws BaseEngineException thrown if there was a problem doing the dryrun
189     */
190    public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
191
192
193    /**
194     * Return the jms topic name for the job.
195     *
196     * @param jobId job Id.
197     * @return String the topic name
198     * @throws DagEngineException thrown if the jms info could not be obtained.
199     */
200    public String getJMSTopicName(String jobId) throws DagEngineException {
201        JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
202        if (jmsTopicService != null) {
203            try {
204                return jmsTopicService.getTopic(jobId);
205            }
206            catch (JPAExecutorException e) {
207               throw new DagEngineException(ErrorCode.E1602, e);
208            }
209        }
210        else {
211            throw new DagEngineException(ErrorCode.E1602,
212                    "JMSTopicService is not initialized. JMS notification"
213                            + "may not be enabled");
214        }
215    }
216
217}