001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.BundleActionBean;
030import org.apache.oozie.BundleJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.XException;
033import org.apache.oozie.client.Job;
034import org.apache.oozie.client.OozieClient;
035import org.apache.oozie.client.rest.JsonBean;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.PreconditionException;
038import org.apache.oozie.command.StartTransitionXCommand;
039import org.apache.oozie.command.coord.CoordSubmitXCommand;
040import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.service.JPAService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.util.JobUtils;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.ParamChecker;
049import org.apache.oozie.util.XConfiguration;
050import org.apache.oozie.util.XmlUtils;
051import org.jdom.Attribute;
052import org.jdom.Element;
053import org.jdom.JDOMException;
054
055/**
056 * The command to start Bundle job
057 */
058public class BundleStartXCommand extends StartTransitionXCommand {
059    private final String jobId;
060    private BundleJobBean bundleJob;
061    private JPAService jpaService = null;
062
063    /**
064     * The constructor for class {@link BundleStartXCommand}
065     *
066     * @param jobId the bundle job id
067     */
068    public BundleStartXCommand(String jobId) {
069        super("bundle_start", "bundle_start", 1);
070        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
071    }
072
073    /**
074     * The constructor for class {@link BundleStartXCommand}
075     *
076     * @param jobId the bundle job id
077     * @param dryrun true if dryrun is enable
078     */
079    public BundleStartXCommand(String jobId, boolean dryrun) {
080        super("bundle_start", "bundle_start", 1, dryrun);
081        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
082    }
083
084    /* (non-Javadoc)
085     * @see org.apache.oozie.command.XCommand#getEntityKey()
086     */
087    @Override
088    public String getEntityKey() {
089        return jobId;
090    }
091
092    @Override
093    public String getKey() {
094        return getName() + "_" + jobId;
095    }
096
097    /* (non-Javadoc)
098     * @see org.apache.oozie.command.XCommand#isLockRequired()
099     */
100    @Override
101    protected boolean isLockRequired() {
102        return true;
103    }
104
105    /* (non-Javadoc)
106     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
107     */
108    @Override
109    protected void verifyPrecondition() throws CommandException, PreconditionException {
110        eagerVerifyPrecondition();
111    }
112
113    /* (non-Javadoc)
114     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
115     */
116    @Override
117    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
118        if (bundleJob.getStatus() != Job.Status.PREP) {
119            String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
120            LOG.info(msg);
121            throw new PreconditionException(ErrorCode.E1100, msg);
122        }
123    }
124    /* (non-Javadoc)
125     * @see org.apache.oozie.command.XCommand#loadState()
126     */
127    @Override
128    public void loadState() throws CommandException {
129        eagerLoadState();
130    }
131
132    /* (non-Javadoc)
133     * @see org.apache.oozie.command.XCommand#eagerLoadState()
134     */
135    @Override
136    public void eagerLoadState() throws CommandException {
137        try {
138            jpaService = Services.get().get(JPAService.class);
139
140            if (jpaService != null) {
141                this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
142                LogUtils.setLogInfo(bundleJob, logInfo);
143                super.setJob(bundleJob);
144
145            }
146            else {
147                throw new CommandException(ErrorCode.E0610);
148            }
149        }
150        catch (XException ex) {
151            throw new CommandException(ex);
152        }
153    }
154
155    /* (non-Javadoc)
156     * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
157     */
158    @Override
159    public void StartChildren() throws CommandException {
160        LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
161        insertBundleActions();
162        startCoordJobs();
163        LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
164    }
165
166    /* (non-Javadoc)
167     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
168     */
169    @Override
170    public void notifyParent() {
171    }
172
173    /* (non-Javadoc)
174     * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
175     */
176    @Override
177    public void performWrites() throws CommandException {
178        try {
179            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
180        }
181        catch (JPAExecutorException e) {
182            throw new CommandException(e);
183        }
184    }
185
186    /**
187     * Insert bundle actions
188     *
189     * @throws CommandException thrown if failed to create bundle actions
190     */
191    @SuppressWarnings("unchecked")
192    private void insertBundleActions() throws CommandException {
193        if (bundleJob != null) {
194            Map<String, Boolean> map = new HashMap<String, Boolean>();
195            try {
196                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
197                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
198                for (Element elem : coordElems) {
199                    Attribute name = elem.getAttribute("name");
200                    Attribute critical = elem.getAttribute("critical");
201                    if (name != null) {
202                        if (map.containsKey(name.getValue())) {
203                            throw new CommandException(ErrorCode.E1304, name);
204                        }
205                        boolean isCritical = false;
206                        if (critical != null && Boolean.parseBoolean(critical.getValue())) {
207                            isCritical = true;
208                        }
209                        map.put(name.getValue(), isCritical);
210                    }
211                    else {
212                        throw new CommandException(ErrorCode.E1305);
213                    }
214                }
215            }
216            catch (JDOMException jex) {
217                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
218            }
219
220            // if there is no coordinator for this bundle, failed it.
221            if (map.isEmpty()) {
222                bundleJob.setStatus(Job.Status.FAILED);
223                bundleJob.resetPending();
224                try {
225                    jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
226                }
227                catch (JPAExecutorException jex) {
228                    throw new CommandException(jex);
229                }
230
231                LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
232                throw new CommandException(ErrorCode.E1318, jobId);
233            }
234
235            for (Entry<String, Boolean> coordName : map.entrySet()) {
236                BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
237                insertList.add(action);
238            }
239        }
240        else {
241            throw new CommandException(ErrorCode.E0604, jobId);
242        }
243    }
244
245    private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
246        BundleActionBean action = new BundleActionBean();
247        action.setBundleActionId(jobId + "_" + coordName);
248        action.setBundleId(jobId);
249        action.setCoordName(coordName);
250        action.setStatus(Job.Status.PREP);
251        action.setLastModifiedTime(new Date());
252        if (isCritical) {
253            action.setCritical();
254        }
255        else {
256            action.resetCritical();
257        }
258        return action;
259    }
260
261    /**
262     * Start Coord Jobs
263     *
264     * @throws CommandException thrown if failed to start coord jobs
265     */
266    @SuppressWarnings("unchecked")
267    private void startCoordJobs() throws CommandException {
268        if (bundleJob != null) {
269            try {
270                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
271                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
272                for (Element coordElem : coordElems) {
273                    Attribute name = coordElem.getAttribute("name");
274                    Configuration coordConf = mergeConfig(coordElem);
275                    coordConf.set(OozieClient.BUNDLE_ID, jobId);
276
277                    queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue()));
278
279                }
280                updateBundleAction();
281            }
282            catch (JDOMException jex) {
283                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
284            }
285            catch (JPAExecutorException je) {
286                throw new CommandException(je);
287            }
288        }
289        else {
290            throw new CommandException(ErrorCode.E0604, jobId);
291        }
292    }
293
294    private void updateBundleAction() throws JPAExecutorException {
295        for(JsonBean bAction : insertList) {
296            BundleActionBean action = (BundleActionBean) bAction;
297            action.incrementAndGetPending();
298            action.setLastModifiedTime(new Date());
299        }
300    }
301
302    /**
303     * Merge Bundle job config and the configuration from the coord job to pass
304     * to Coord Engine
305     *
306     * @param coordElem the coordinator configuration
307     * @return Configuration merged configuration
308     * @throws CommandException thrown if failed to merge configuration
309     */
310    private Configuration mergeConfig(Element coordElem) throws CommandException {
311        String jobConf = bundleJob.getConf();
312        // Step 1: runConf = jobConf
313        Configuration runConf = null;
314        try {
315            runConf = new XConfiguration(new StringReader(jobConf));
316        }
317        catch (IOException e1) {
318            LOG.warn("Configuration parse error in:" + jobConf);
319            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
320        }
321        // Step 2: Merge local properties into runConf
322        // extract 'property' tags under 'configuration' block in the coordElem
323        // convert Element to XConfiguration
324        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
325
326        if (localConfigElement != null) {
327            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
328            Configuration localConf;
329            try {
330                localConf = new XConfiguration(new StringReader(strConfig));
331            }
332            catch (IOException e1) {
333                LOG.warn("Configuration parse error in:" + strConfig);
334                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
335            }
336
337            // copy configuration properties in the coordElem to the runConf
338            XConfiguration.copy(localConf, runConf);
339        }
340
341        // Step 3: Extract value of 'app-path' in coordElem, save it as a
342        // new property called 'oozie.coord.application.path', and normalize.
343        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
344        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
345        // Normalize coordinator appPath here;
346        try {
347            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
348        }
349        catch (IOException e) {
350            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
351        }
352        return runConf;
353    }
354
355    /* (non-Javadoc)
356     * @see org.apache.oozie.command.TransitionXCommand#getJob()
357     */
358    @Override
359    public Job getJob() {
360        return bundleJob;
361    }
362
363    /* (non-Javadoc)
364     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
365     */
366    @Override
367    public void updateJob() throws CommandException {
368        updateList.add(bundleJob);
369    }
370}