001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026
027import org.apache.oozie.BundleActionBean;
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.XException;
031import org.apache.oozie.client.Job;
032import org.apache.oozie.client.OozieClient;
033import org.apache.oozie.client.rest.JsonBean;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.command.PreconditionException;
036import org.apache.oozie.command.XCommand;
037import org.apache.oozie.command.coord.CoordChangeXCommand;
038import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
039import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
040import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
041import org.apache.oozie.service.JPAService;
042import org.apache.oozie.service.Services;
043import org.apache.oozie.util.DateUtils;
044import org.apache.oozie.util.JobUtils;
045import org.apache.oozie.util.LogUtils;
046import org.apache.oozie.util.ParamChecker;
047import org.apache.oozie.util.StatusUtils;
048
049public class BundleJobChangeXCommand extends XCommand<Void> {
050    private String jobId;
051    private String changeValue;
052    private JPAService jpaService;
053    private List<BundleActionBean> bundleActions;
054    private BundleJobBean bundleJob;
055    private Date newPauseTime = null;
056    private Date newEndTime = null;
057    boolean isChangePauseTime = false;
058    boolean isChangeEndTime = false;
059    private List<JsonBean> updateList = new ArrayList<JsonBean>();
060
061    private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
062    static {
063        ALLOWED_CHANGE_OPTIONS.add("pausetime");
064        ALLOWED_CHANGE_OPTIONS.add("endtime");
065    }
066
067    /**
068     * @param id bundle job id
069     * @param changeValue change value
070     *
071     * @throws CommandException thrown if failed to change bundle
072     */
073    public BundleJobChangeXCommand(String id, String changeValue) throws CommandException {
074        super("bundle_change", "bundle_change", 1);
075        this.jobId = ParamChecker.notEmpty(id, "id");
076        this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue");
077    }
078
079    /**
080     * Check if new pause time is future time.
081     *
082     * @param newPauseTime new pause time.
083     * @throws CommandException thrown if new pause time is not valid.
084     */
085    private void checkPauseTime(Date newPauseTime) throws CommandException {
086        // New pauseTime has to be a non-past time.
087        Date d = new Date();
088        if (newPauseTime.before(d)) {
089            throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time");
090        }
091    }
092
093    /**
094     * Check if new pause time is future time.
095     *
096     * @param newEndTime new end time, can be null meaning no change on end time.
097     * @throws CommandException thrown if new end time is not valid.
098     */
099    private void checkEndTime(Date newEndTime) throws CommandException {
100        // New endTime has to be a non-past start time.
101        Date startTime = bundleJob.getKickoffTime();
102        if (startTime != null && newEndTime.before(startTime)) {
103            throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time");
104        }
105    }
106
107    /**
108     * validate if change value is valid.
109     *
110     * @param changeValue change value.
111     * @throws CommandException thrown if changeValue cannot be parsed properly.
112     */
113    private void validateChangeValue(String changeValue) throws CommandException {
114        Map<String, String> map = JobUtils.parseChangeValue(changeValue);
115
116        if (map.size() > ALLOWED_CHANGE_OPTIONS.size() || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) {
117            throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time");
118        }
119
120        if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
121            isChangePauseTime = true;
122        }
123        else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){
124            isChangeEndTime = true;
125        }
126        else {
127            throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime");
128        }
129
130        if(isChangePauseTime){
131            String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
132            if (!value.equals(""))   {
133                try {
134                    newPauseTime = DateUtils.parseDateOozieTZ(value);
135                }
136                catch (Exception ex) {
137                    throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
138                }
139
140                checkPauseTime(newPauseTime);
141            }
142        }
143        else if (isChangeEndTime){
144            String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
145            if (!value.equals(""))   {
146                try {
147                    newEndTime = DateUtils.parseDateOozieTZ(value);
148                }
149                catch (Exception ex) {
150                    throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
151                }
152
153                checkEndTime(newEndTime);
154            }
155        }
156    }
157
158    /* (non-Javadoc)
159     * @see org.apache.oozie.command.XCommand#execute()
160     */
161    @Override
162    protected Void execute() throws CommandException {
163        try {
164            if (isChangePauseTime || isChangeEndTime) {
165                if (isChangePauseTime) {
166                    bundleJob.setPauseTime(newPauseTime);
167                }
168                else if (isChangeEndTime) {
169                    bundleJob.setEndTime(newEndTime);
170                    if (bundleJob.getStatus() == Job.Status.SUCCEEDED) {
171                        bundleJob.setStatus(Job.Status.RUNNING);
172                    }
173                    if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) {
174                        bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR));
175                    }
176                }
177                for (BundleActionBean action : this.bundleActions) {
178                    // queue coord change commands;
179                    if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) {
180                        queue(new CoordChangeXCommand(action.getCoordId(), changeValue));
181                        LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change "
182                                + changeValue);
183                        action.setPending(action.getPending() + 1);
184                        updateList.add(action);
185                    }
186                }
187                updateList.add(bundleJob);
188                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
189            }
190            return null;
191        }
192        catch (XException ex) {
193            throw new CommandException(ex);
194        }
195    }
196
197    /* (non-Javadoc)
198     * @see org.apache.oozie.command.XCommand#getEntityKey()
199     */
200    @Override
201    public String getEntityKey() {
202        return this.jobId;
203    }
204
205    /* (non-Javadoc)
206     * @see org.apache.oozie.command.XCommand#isLockRequired()
207     */
208    @Override
209    protected boolean isLockRequired() {
210        return true;
211    }
212
213    /* (non-Javadoc)
214     * @see org.apache.oozie.command.XCommand#loadState()
215     */
216    @Override
217    protected void loadState() throws CommandException {
218        try{
219            eagerLoadState();
220            this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
221        }
222        catch(Exception Ex){
223            throw new CommandException(ErrorCode.E1311,this.jobId);
224        }
225    }
226
227    /* (non-Javadoc)
228     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
229     */
230    @Override
231    protected void verifyPrecondition() throws CommandException, PreconditionException {
232    }
233
234    /* (non-Javadoc)
235     * @see org.apache.oozie.command.XCommand#eagerLoadState()
236     */
237    @Override
238    protected void eagerLoadState() throws CommandException {
239        try {
240            jpaService = Services.get().get(JPAService.class);
241
242            if (jpaService != null) {
243                this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
244                LogUtils.setLogInfo(bundleJob, logInfo);
245            }
246            else {
247                throw new CommandException(ErrorCode.E0610);
248            }
249        }
250        catch (XException ex) {
251            throw new CommandException(ex);
252        }
253    }
254
255    /* (non-Javadoc)
256     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
257     */
258    @Override
259    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
260        validateChangeValue(changeValue);
261
262        if (bundleJob == null) {
263            LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist");
264            throw new PreconditionException(ErrorCode.E1314, jobId);
265        }
266        if (isChangePauseTime) {
267            if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
268                    || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR
269                    || bundleJob == null) {
270                LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is "
271                        + bundleJob.getStatusStr());
272                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
273            }
274        }
275        else if(isChangeEndTime){
276            if (bundleJob.getStatus() == Job.Status.KILLED || bundleJob == null) {
277                LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is "
278                        + bundleJob.getStatusStr());
279                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
280            }
281        }
282    }
283}