001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.oozie.BundleActionBean;
026import org.apache.oozie.BundleJobBean;
027import org.apache.oozie.CoordinatorJobBean;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.XException;
030import org.apache.oozie.client.Job;
031import org.apache.oozie.client.rest.RestConstants;
032import org.apache.oozie.command.CommandException;
033import org.apache.oozie.command.RerunTransitionXCommand;
034import org.apache.oozie.command.coord.CoordRerunXCommand;
035import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
036import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
037import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039import org.apache.oozie.executor.jpa.JPAExecutorException;
040import org.apache.oozie.service.JPAService;
041import org.apache.oozie.service.Services;
042import org.apache.oozie.util.DateUtils;
043import org.apache.oozie.util.LogUtils;
044import org.apache.oozie.util.ParamChecker;
045import org.apache.oozie.util.XLog;
046
047/**
048 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
049 * <p/>
050 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
051 * <p/>
052 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
053 */
054public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
055
056    private final String coordScope;
057    private final String dateScope;
058    private final boolean refresh;
059    private final boolean noCleanup;
060    private BundleJobBean bundleJob;
061    private List<BundleActionBean> bundleActions;
062    protected boolean prevPending;
063
064    private JPAService jpaService = null;
065
066    /**
067     * The constructor for class {@link BundleRerunXCommand}
068     *
069     * @param jobId the bundle job id
070     * @param coordScope the rerun scope for coordinator job names separated by ","
071     * @param dateScope the rerun scope for coordinator nominal times separated by ","
072     * @param refresh true if user wants to refresh input/outpur dataset urls
073     * @param noCleanup false if user wants to cleanup output events for given rerun actions
074     */
075    public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
076        super("bundle_rerun", "bundle_rerun", 1);
077        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
078        this.coordScope = coordScope;
079        this.dateScope = dateScope;
080        this.refresh = refresh;
081        this.noCleanup = noCleanup;
082    }
083
084    /* (non-Javadoc)
085     * @see org.apache.oozie.command.XCommand#loadState()
086     */
087    @Override
088    protected void loadState() throws CommandException {
089        try {
090            jpaService = Services.get().get(JPAService.class);
091
092            if (jpaService != null) {
093                this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
094                this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
095                LogUtils.setLogInfo(bundleJob, logInfo);
096                super.setJob(bundleJob);
097                prevPending = bundleJob.isPending();
098            }
099            else {
100                throw new CommandException(ErrorCode.E0610);
101            }
102        }
103        catch (XException ex) {
104            throw new CommandException(ex);
105        }
106
107    }
108
109    /* (non-Javadoc)
110     * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
111     */
112    @Override
113    public void rerunChildren() throws CommandException {
114        boolean isUpdateActionDone = false;
115        Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
116        if (bundleActions != null) {
117            for (BundleActionBean action : bundleActions) {
118                if (action.getCoordName() != null) {
119                    coordNameToBAMapping.put(action.getCoordName(), action);
120                }
121            }
122        }
123
124        if (coordScope != null && !coordScope.isEmpty()) {
125            String[] list = coordScope.split(",");
126            for (String coordName : list) {
127                coordName = coordName.trim();
128                if (coordNameToBAMapping.keySet().contains(coordName)) {
129                    String coordId = coordNameToBAMapping.get(coordName).getCoordId();
130                    if (coordId == null) {
131                        LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
132                        continue;
133                    }
134                    CoordinatorJobBean coordJob = getCoordJob(coordId);
135
136                    String rerunDateScope;
137                    if (dateScope != null && !dateScope.isEmpty()) {
138                        rerunDateScope = dateScope;
139                    }
140                    else {
141                        String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime());
142                        String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime());
143                        rerunDateScope = coordStart + "::" + coordEnd;
144                    }
145                    LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
146                            + bundleJob.getId());
147                    queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_RERUN_DATE, rerunDateScope, refresh,
148                            noCleanup));
149                    updateBundleAction(coordNameToBAMapping.get(coordName));
150                    isUpdateActionDone = true;
151                }
152                else {
153                    LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
154                }
155            }
156        }
157        else if (dateScope != null && !dateScope.isEmpty()) {
158            if (bundleActions != null) {
159                for (BundleActionBean action : bundleActions) {
160                    if (action.getCoordId() == null) {
161                        LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
162                                + action.getCoordName());
163                        continue;
164                    }
165                    LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
166                            + bundleJob.getId());
167                    queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_RERUN_DATE, dateScope,
168                            refresh, noCleanup));
169                    updateBundleAction(action);
170                    isUpdateActionDone = true;
171                }
172            }
173        }
174        if (!isUpdateActionDone) {
175            transitToPrevious();
176        }
177        LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
178    }
179
180    private final void transitToPrevious() throws CommandException {
181        bundleJob.setStatus(getPrevStatus());
182        if (!prevPending) {
183            bundleJob.resetPending();
184        }
185        else {
186            bundleJob.setPending();
187        }
188        updateJob();
189    }
190
191    /**
192     * Update bundle action
193     *
194     * @param action the bundle action
195     * @throws CommandException thrown if failed to update bundle action
196     */
197    private void updateBundleAction(BundleActionBean action) {
198        action.incrementAndGetPending();
199        action.setLastModifiedTime(new Date());
200        updateList.add(action);
201    }
202
203    /* (non-Javadoc)
204     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
205     */
206    @Override
207    public void updateJob() {
208        // rerun a paused bundle job will keep job status at paused and pending at previous pending
209        if (getPrevStatus() != null) {
210            Job.Status bundleJobStatus = getPrevStatus();
211            if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
212                bundleJob.setStatus(bundleJobStatus);
213                if (prevPending) {
214                    bundleJob.setPending();
215                }
216                else {
217                    bundleJob.resetPending();
218                }
219            }
220        }
221        updateList.add(bundleJob);
222    }
223
224    /* (non-Javadoc)
225     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
226     */
227    @Override
228    public void performWrites() throws CommandException {
229        try {
230            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
231        }
232        catch (JPAExecutorException e) {
233            throw new CommandException(e);
234        }
235    }
236
237    /* (non-Javadoc)
238     * @see org.apache.oozie.command.XCommand#getEntityKey()
239     */
240    @Override
241    public String getEntityKey() {
242        return jobId;
243    }
244
245    /* (non-Javadoc)
246     * @see org.apache.oozie.command.XCommand#isLockRequired()
247     */
248    @Override
249    protected boolean isLockRequired() {
250        return true;
251    }
252
253    /*
254     * (non-Javadoc)
255     * @see org.apache.oozie.command.TransitionXCommand#getJob()
256     */
257    @Override
258    public Job getJob() {
259        return bundleJob;
260    }
261
262    /* (non-Javadoc)
263     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
264     */
265    @Override
266    public void notifyParent() throws CommandException {
267
268    }
269
270    /* (non-Javadoc)
271     * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
272     */
273    @Override
274    public XLog getLog() {
275        return LOG;
276    }
277
278    private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
279        try {
280            CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
281            return job;
282        }
283        catch (JPAExecutorException je) {
284            throw new CommandException(je);
285        }
286
287    }
288
289}