001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.ArrayList;
021import java.util.Calendar;
022import java.util.Date;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Set;
028
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.XException;
033import org.apache.oozie.client.CoordinatorJob;
034import org.apache.oozie.client.Job;
035import org.apache.oozie.client.OozieClient;
036import org.apache.oozie.client.rest.JsonBean;
037import org.apache.oozie.command.CommandException;
038import org.apache.oozie.command.PreconditionException;
039import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
040import org.apache.oozie.coord.TimeUnit;
041import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
042import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
043import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
044import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045import org.apache.oozie.executor.jpa.JPAExecutorException;
046import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
047import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.sla.SLARegistrationBean;
051import org.apache.oozie.sla.SLASummaryBean;
052import org.apache.oozie.sla.service.SLAService;
053import org.apache.oozie.util.DateUtils;
054import org.apache.oozie.util.JobUtils;
055import org.apache.oozie.util.LogUtils;
056import org.apache.oozie.util.ParamChecker;
057import org.apache.oozie.util.StatusUtils;
058
059public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
060    private final String jobId;
061    private Date newEndTime = null;
062    private Integer newConcurrency = null;
063    private Date newPauseTime = null;
064    private Date oldPauseTime = null;
065    private boolean resetPauseTime = false;
066    private CoordinatorJobBean coordJob;
067    private JPAService jpaService = null;
068    private Job.Status prevStatus;
069    private List<JsonBean> updateList = new ArrayList<JsonBean>();
070    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
071
072    private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
073    static {
074        ALLOWED_CHANGE_OPTIONS.add("endtime");
075        ALLOWED_CHANGE_OPTIONS.add("concurrency");
076        ALLOWED_CHANGE_OPTIONS.add("pausetime");
077    }
078
079    /**
080     * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
081     * that to database.
082     *
083     * @param id Coordinator job id.
084     * @param changeValue This the changed value in the form key=value.
085     * @throws CommandException thrown if changeValue cannot be parsed properly.
086     */
087    public CoordChangeXCommand(String id, String changeValue) throws CommandException {
088        super("coord_change", "coord_change", 0);
089        this.jobId = ParamChecker.notEmpty(id, "id");
090        ParamChecker.notEmpty(changeValue, "value");
091
092        validateChangeValue(changeValue);
093    }
094
095    /**
096     * @param changeValue change value.
097     * @throws CommandException thrown if changeValue cannot be parsed properly.
098     */
099    private void validateChangeValue(String changeValue) throws CommandException {
100        Map<String, String> map = JobUtils.parseChangeValue(changeValue);
101
102        if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
103            throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
104        }
105
106        java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
107        while (iter.hasNext()) {
108            Entry<String, String> entry = iter.next();
109            String key = entry.getKey();
110            String value = entry.getValue();
111
112            if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
113                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
114            }
115
116            if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
117                throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
118            }
119        }
120
121        if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
122            String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
123            try {
124                newEndTime = DateUtils.parseDateOozieTZ(value);
125            }
126            catch (Exception ex) {
127                throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
128            }
129        }
130
131        if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
132            String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
133            try {
134                newConcurrency = Integer.parseInt(value);
135            }
136            catch (NumberFormatException ex) {
137                throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
138            }
139        }
140
141        if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
142            String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
143            if (value.equals("")) { // this is to reset pause time to null;
144                resetPauseTime = true;
145            }
146            else {
147                try {
148                    newPauseTime = DateUtils.parseDateOozieTZ(value);
149                }
150                catch (Exception ex) {
151                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
152                }
153            }
154        }
155    }
156
157    /**
158     * Returns the actual last action time(one instance before coordJob.lastActionTime)
159     * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise
160     */
161    private Date getLastActionTime() {
162        if(coordJob.getLastActionTime() == null)
163            return null;
164
165        Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
166        d.setTime(coordJob.getLastActionTime());
167        TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
168        d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency()));
169        return d.getTime();
170    }
171
172    /**
173     * Check if new end time is valid.
174     *
175     * @param coordJob coordinator job id.
176     * @param newEndTime new end time.
177     * @throws CommandException thrown if new end time is not valid.
178     */
179    private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
180        // New endTime cannot be before coordinator job's start time.
181        Date startTime = coordJob.getStartTime();
182        if (newEndTime.before(startTime)) {
183            throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
184                    + startTime + "]");
185        }
186
187        // New endTime cannot be before coordinator job's last action time.
188        Date lastActionTime = getLastActionTime();
189        if (lastActionTime != null) {
190            if (!newEndTime.after(lastActionTime)) {
191                throw new CommandException(ErrorCode.E1015, newEndTime,
192                        "must be after coordinator job's last action time [" + lastActionTime + "]");
193            }
194        }
195    }
196
197    /**
198     * Check if new pause time is valid.
199     *
200     * @param coordJob coordinator job id.
201     * @param newPauseTime new pause time.
202     * @param newEndTime new end time, can be null meaning no change on end time.
203     * @throws CommandException thrown if new pause time is not valid.
204     */
205    private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
206            throws CommandException {
207        // New pauseTime has to be a non-past time.
208        Date d = new Date();
209        if (newPauseTime.before(d)) {
210            throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
211        }
212    }
213
214    /**
215     * Process lookahead created actions that become invalid because of the new pause time,
216     * These actions will be deleted from DB, also the coordinator job will be updated accordingly
217     *
218     * @param coordJob coordinator job
219     * @param newPauseTime new pause time
220     */
221    private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
222        Date lastActionTime = coordJob.getLastActionTime();
223        if (lastActionTime != null) {
224            // d is the real last action time.
225            Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
226            d.setTime(getLastActionTime());
227            TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
228
229            int lastActionNumber = coordJob.getLastActionNumber();
230
231            boolean hasChanged = false;
232            while (true) {
233                if (!newPauseTime.after(d.getTime())) {
234                    deleteAction(lastActionNumber);
235                    d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency()));
236                    lastActionNumber = lastActionNumber - 1;
237
238                    hasChanged = true;
239                }
240                else {
241                    break;
242                }
243            }
244
245            if (hasChanged == true) {
246                coordJob.setLastActionNumber(lastActionNumber);
247                d.add(timeUnit.getCalendarUnit(), Integer.valueOf(coordJob.getFrequency()));
248                Date d1 = d.getTime();
249                coordJob.setLastActionTime(d1);
250                coordJob.setNextMaterializedTime(d1);
251                coordJob.resetDoneMaterialization();
252            }
253        }
254    }
255
256    /**
257     * Delete coordinator action
258     *
259     * @param actionNum coordinator action number
260     */
261    private void deleteAction(int actionNum) throws CommandException {
262        try {
263            String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
264            CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
265            // delete SLA registration entry (if any) for action
266            if (SLAService.isEnabled()) {
267                Services.get().get(SLAService.class).removeRegistration(actionId);
268            }
269            SLARegistrationBean slaReg = jpaService.execute(new SLARegistrationGetJPAExecutor(actionId));
270            if (slaReg != null) {
271                LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
272                deleteList.add(slaReg);
273            }
274            SLASummaryBean slaSummaryBean = jpaService.execute(new SLASummaryGetJPAExecutor(actionId));
275            if (slaSummaryBean != null) {
276                LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
277                deleteList.add(slaSummaryBean);
278            }
279            deleteList.add(bean);
280        }
281        catch (JPAExecutorException e) {
282            throw new CommandException(e);
283        }
284    }
285
286    /**
287     * Check if new end time, new concurrency, new pause time are valid.
288     *
289     * @param coordJob coordinator job id.
290     * @param newEndTime new end time.
291     * @param newConcurrency new concurrency.
292     * @param newPauseTime new pause time.
293     * @throws CommandException thrown if new values are not valid.
294     */
295    private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
296            throws CommandException {
297        if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
298            throw new CommandException(ErrorCode.E1016);
299        }
300
301        if (newEndTime != null) {
302            checkEndTime(coordJob, newEndTime);
303        }
304
305        if (newPauseTime != null) {
306            checkPauseTime(coordJob, newPauseTime);
307        }
308    }
309
310    /* (non-Javadoc)
311     * @see org.apache.oozie.command.XCommand#execute()
312     */
313    @Override
314    protected Void execute() throws CommandException {
315        LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
316
317        try {
318            if (newEndTime != null) {
319                boolean dontChange = coordJob.getEndTime().before(newEndTime)
320                        && coordJob.getNextMaterializedTime() != null
321                        && coordJob.getNextMaterializedTime().after(newEndTime);
322                if (!dontChange) {
323                    coordJob.setEndTime(newEndTime);
324                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
325                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
326                    }
327                    if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
328                            || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
329                        // Check for backward compatibility for Oozie versions (3.2 and before)
330                        // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
331                        // PAUSEDWITHERROR is not supported
332                        coordJob.setStatus(StatusUtils
333                                .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
334                    }
335                    coordJob.setPending();
336                    coordJob.resetDoneMaterialization();
337                }
338            }
339
340            if (newConcurrency != null) {
341                this.coordJob.setConcurrency(newConcurrency);
342            }
343
344            if (newPauseTime != null || resetPauseTime == true) {
345                this.coordJob.setPauseTime(newPauseTime);
346                if (oldPauseTime != null && newPauseTime != null) {
347                    if (oldPauseTime.before(newPauseTime)) {
348                        if (this.coordJob.getStatus() == Job.Status.PAUSED) {
349                            this.coordJob.setStatus(Job.Status.RUNNING);
350                        }
351                        else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
352                            this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
353                        }
354                    }
355                }
356                else if (oldPauseTime != null && newPauseTime == null) {
357                    if (this.coordJob.getStatus() == Job.Status.PAUSED) {
358                        this.coordJob.setStatus(Job.Status.RUNNING);
359                    }
360                    else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
361                        this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
362                    }
363                }
364                if (!resetPauseTime) {
365                    processLookaheadActions(coordJob, newPauseTime);
366                }
367            }
368
369            if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
370                LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
371                        + ", set pending to true");
372                // set doneMaterialization to true when materialization is done
373                coordJob.setDoneMaterialization();
374            }
375
376            coordJob.setLastModifiedTime(new Date());
377            updateList.add(coordJob);
378            jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
379
380            return null;
381        }
382        catch (XException ex) {
383            throw new CommandException(ex);
384        }
385        finally {
386            LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
387            // update bundle action
388            if (coordJob.getBundleId() != null) {
389                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
390                bundleStatusUpdate.call();
391            }
392        }
393    }
394
395    /* (non-Javadoc)
396     * @see org.apache.oozie.command.XCommand#getEntityKey()
397     */
398    @Override
399    public String getEntityKey() {
400        return this.jobId;
401    }
402
403    /* (non-Javadoc)
404     * @see org.apache.oozie.command.XCommand#loadState()
405     */
406    @Override
407    protected void loadState() throws CommandException{
408        jpaService = Services.get().get(JPAService.class);
409
410        if (jpaService == null) {
411            throw new CommandException(ErrorCode.E0610);
412        }
413
414        try {
415            this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
416            oldPauseTime = coordJob.getPauseTime();
417            prevStatus = coordJob.getStatus();
418        }
419        catch (JPAExecutorException e) {
420            throw new CommandException(e);
421        }
422
423        LogUtils.setLogInfo(this.coordJob, logInfo);
424    }
425
426    /* (non-Javadoc)
427     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
428     */
429    @Override
430    protected void verifyPrecondition() throws CommandException,PreconditionException {
431        check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
432    }
433
434    /* (non-Javadoc)
435     * @see org.apache.oozie.command.XCommand#isLockRequired()
436     */
437    @Override
438    protected boolean isLockRequired() {
439        return true;
440    }
441}