001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.List;
021
022import org.apache.oozie.CoordinatorActionBean;
023import org.apache.oozie.CoordinatorJobBean;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.client.CoordinatorAction;
026import org.apache.oozie.client.Job;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.PreconditionException;
029import org.apache.oozie.executor.jpa.CoordJobGetReadyActionsJPAExecutor;
030import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
031import org.apache.oozie.executor.jpa.JPAExecutorException;
032import org.apache.oozie.service.JPAService;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.util.LogUtils;
035import org.apache.oozie.util.XLog;
036
037public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> {
038    private final String jobId;
039    private final XLog log = getLog();
040    private CoordinatorJobBean coordJob = null;
041    private JPAService jpaService = null;
042
043    public CoordActionReadyXCommand(String id) {
044        super("coord_action_ready", "coord_action_ready", 1);
045        this.jobId = id;
046    }
047
048    @Override
049    /**
050     * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
051     * This method checks all the actions associated with a jobId to figure out which actions
052     * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY])
053     *
054     */
055    protected Void execute() throws CommandException {
056        // number of actions to start (-1 means start ALL)
057        int numActionsToStart = -1;
058
059        // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
060        String jobExecution = coordJob.getExecution();
061        // get concurrency setting for this job
062        int jobConcurrency = coordJob.getConcurrency();
063        // if less than 0, then UNLIMITED concurrency
064        if (jobConcurrency >= 0) {
065            // count number of actions that are already RUNNING or SUBMITTED
066            // subtract from CONCURRENCY to calculate number of actions to start
067            // in WF engine
068
069            int numRunningJobs;
070            try {
071                numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
072            }
073            catch (JPAExecutorException je) {
074                throw new CommandException(je);
075            }
076
077            numActionsToStart = jobConcurrency - numRunningJobs;
078            if (numActionsToStart < 0) {
079                numActionsToStart = 0;
080            }
081            log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
082                    + numRunningJobs + ", numLeftover=" + numActionsToStart);
083            // no actions to start
084            if (numActionsToStart == 0) {
085                log.warn("No actions to start! for jobId=" + jobId);
086                return null;
087            }
088        }
089        // get list of actions that are READY and fit in the concurrency and execution
090
091        List<CoordinatorActionBean> actions;
092        try {
093            actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, numActionsToStart, jobExecution));
094        }
095        catch (JPAExecutorException je) {
096            throw new CommandException(je);
097        }
098        log.debug("Number of READY actions = " + actions.size());
099        String user = coordJob.getUser();
100        // make sure auth token is not null
101        // log.denug("user=" + user + ", token=" + authToken);
102        int counter = 0;
103        for (CoordinatorActionBean action : actions) {
104            // continue if numActionsToStart is negative (no limit on number of
105            // actions), or if the counter is less than numActionsToStart
106            if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
107                log.debug("Set status to SUBMITTED for id: " + action.getId());
108                // change state of action to SUBMITTED
109                action.setStatus(CoordinatorAction.Status.SUBMITTED);
110                // queue action to start action
111                queue(new CoordActionStartXCommand(action.getId(), user, coordJob.getAppName(),
112                        action.getJobId()), 100);
113                try {
114                    jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(action));
115                }
116                catch (JPAExecutorException je) {
117                    throw new CommandException(je);
118                }
119            }
120            else {
121                break;
122            }
123            counter++;
124
125        }
126        return null;
127    }
128
129    @Override
130    public String getEntityKey() {
131        return jobId;
132    }
133
134    @Override
135    public String getKey() {
136        return getName() + "_" + jobId;
137    }
138
139    @Override
140    protected boolean isLockRequired() {
141        return true;
142    }
143
144    @Override
145    protected void loadState() throws CommandException {
146        jpaService = Services.get().get(JPAService.class);
147        if (jpaService == null) {
148            throw new CommandException(ErrorCode.E0610);
149        }
150        try {
151            coordJob = jpaService.execute(new org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor(jobId));
152        }
153        catch (JPAExecutorException e) {
154            throw new CommandException(e);
155        }
156        LogUtils.setLogInfo(coordJob, logInfo);
157    }
158
159    @Override
160    protected void verifyPrecondition() throws CommandException, PreconditionException {
161        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.SUCCEEDED && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
162            throw new PreconditionException(ErrorCode.E1100, "[" + jobId
163                    + "]::CoordActionReady:: Ignoring job. Coordinator job is not in RUNNING state, but state="
164                    + coordJob.getStatus());
165        }
166    }
167}