001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.sql.Timestamp;
023import java.util.Calendar;
024import java.util.Date;
025import java.util.TimeZone;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.AppType;
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.SLAEventBean;
033import org.apache.oozie.client.CoordinatorJob;
034import org.apache.oozie.client.Job;
035import org.apache.oozie.client.SLAEvent.SlaAppType;
036import org.apache.oozie.client.rest.JsonBean;
037import org.apache.oozie.command.CommandException;
038import org.apache.oozie.command.MaterializeTransitionXCommand;
039import org.apache.oozie.command.PreconditionException;
040import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041import org.apache.oozie.coord.TimeUnit;
042import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
043import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
044import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
046import org.apache.oozie.executor.jpa.JPAExecutorException;
047import org.apache.oozie.service.EventHandlerService;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.Service;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.util.DateUtils;
052import org.apache.oozie.util.Instrumentation;
053import org.apache.oozie.util.LogUtils;
054import org.apache.oozie.util.ParamChecker;
055import org.apache.oozie.sla.SLAOperations;
056import org.apache.oozie.util.StatusUtils;
057import org.apache.oozie.util.XConfiguration;
058import org.apache.oozie.util.XmlUtils;
059import org.apache.oozie.util.db.SLADbOperations;
060import org.jdom.Element;
061
062/**
063 * Materialize actions for specified start and end time for coordinator job.
064 */
065@SuppressWarnings("deprecation")
066public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
067    private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
068    private JPAService jpaService = null;
069    private CoordinatorJobBean coordJob = null;
070    private String jobId = null;
071    private Date startMatdTime = null;
072    private Date endMatdTime = null;
073    private final int materializationWindow;
074    private int lastActionNumber = 1; // over-ride by DB value
075    private CoordinatorJob.Status prevStatus = null;
076    /**
077     * Default MAX timeout in minutes, after which coordinator input check will timeout
078     */
079    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
080
081    /**
082     * The constructor for class {@link CoordMaterializeTransitionXCommand}
083     *
084     * @param jobId coordinator job id
085     * @param materializationWindow materialization window to calculate end time
086     */
087    public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
088        super("coord_mater", "coord_mater", 1);
089        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
090        this.materializationWindow = materializationWindow;
091    }
092
093    /* (non-Javadoc)
094     * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
095     */
096    @Override
097    public void transitToNext() throws CommandException {
098    }
099
100    /* (non-Javadoc)
101     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
102     */
103    @Override
104    public void updateJob() throws CommandException {
105        updateList.add(coordJob);
106    }
107
108    /* (non-Javadoc)
109     * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
110     */
111    @Override
112    public void performWrites() throws CommandException {
113        try {
114            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
115            // register the partition related dependencies of actions
116            for (JsonBean actionBean : insertList) {
117                if (actionBean instanceof CoordinatorActionBean) {
118                    CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
119                    if (EventHandlerService.isEnabled()) {
120                        CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
121                    }
122                    if (coordAction.getPushMissingDependencies() != null) {
123                        // TODO: Delay in catchup mode?
124                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
125                    }
126                }
127            }
128        }
129        catch (JPAExecutorException jex) {
130            throw new CommandException(jex);
131        }
132    }
133
134    /* (non-Javadoc)
135     * @see org.apache.oozie.command.XCommand#getEntityKey()
136     */
137    @Override
138    public String getEntityKey() {
139        return this.jobId;
140    }
141
142    @Override
143    protected boolean isLockRequired() {
144        return true;
145    }
146
147    /* (non-Javadoc)
148     * @see org.apache.oozie.command.XCommand#loadState()
149     */
150    @Override
151    protected void loadState() throws CommandException {
152        jpaService = Services.get().get(JPAService.class);
153        if (jpaService == null) {
154            LOG.error(ErrorCode.E0610);
155        }
156
157        try {
158            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
159            prevStatus = coordJob.getStatus();
160        }
161        catch (JPAExecutorException jex) {
162            throw new CommandException(jex);
163        }
164
165        // calculate start materialize and end materialize time
166        calcMatdTime();
167
168        LogUtils.setLogInfo(coordJob, logInfo);
169    }
170
171    /**
172     * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
173     *
174     * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
175     */
176    protected void calcMatdTime() throws CommandException {
177        Timestamp startTime = coordJob.getNextMaterializedTimestamp();
178        if (startTime == null) {
179            startTime = coordJob.getStartTimestamp();
180        }
181        // calculate end time by adding materializationWindow to start time.
182        // need to convert materializationWindow from secs to milliseconds
183        long startTimeMilli = startTime.getTime();
184        long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
185
186        startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
187        endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
188        // if MaterializationWindow end time is greater than endTime
189        // for job, then set it to endTime of job
190        Date jobEndTime = coordJob.getEndTime();
191        if (endMatdTime.compareTo(jobEndTime) > 0) {
192            endMatdTime = jobEndTime;
193        }
194
195        LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime
196                + ", window=" + materializationWindow);
197    }
198
199    /* (non-Javadoc)
200     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
201     */
202    @Override
203    protected void verifyPrecondition() throws CommandException, PreconditionException {
204        if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
205                || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
206            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
207                    + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
208        }
209
210        if (coordJob.isDoneMaterialization()) {
211            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId
212                    + " job is already materialized");
213        }
214
215        if (coordJob.getNextMaterializedTimestamp() != null
216                && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
217            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
218                    + " job is already materialized");
219        }
220
221        Timestamp startTime = coordJob.getNextMaterializedTimestamp();
222        if (startTime == null) {
223            startTime = coordJob.getStartTimestamp();
224
225            if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
226                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
227                        + jobId + " job's start time is not reached yet - nothing to materialize");
228            }
229        }
230
231        if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
232            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
233                    + ", all actions have been materialized from start time = " + coordJob.getStartTime()
234                    + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
235        }
236
237        if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
238            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
239                    + ", action is *already* materialized for Materialization start time = " + startMatdTime
240                    + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
241        }
242
243        if (endMatdTime.after(coordJob.getEndTime())) {
244            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
245                    + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
246                    + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
247        }
248
249        if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
250            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
251                    + ", materialization start time = " + startMatdTime
252                    + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
253                    + ", job status = " + coordJob.getStatusStr());
254        }
255
256    }
257
258    /* (non-Javadoc)
259     * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
260     */
261    @Override
262    protected void materialize() throws CommandException {
263        Instrumentation.Cron cron = new Instrumentation.Cron();
264        cron.start();
265        try {
266            materializeActions(false);
267            updateJobMaterializeInfo(coordJob);
268        }
269        catch (CommandException ex) {
270            LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex);
271            coordJob.setStatus(Job.Status.FAILED);
272            coordJob.resetPending();
273            // remove any materialized actions and slaEvents
274            insertList.clear();
275        }
276        catch (Exception e) {
277            LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
278            coordJob.setStatus(Job.Status.FAILED);
279            try {
280                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
281            }
282            catch (JPAExecutorException jex) {
283                throw new CommandException(ErrorCode.E1011, jex);
284            }
285            throw new CommandException(ErrorCode.E1012, e.getMessage(), e);
286        }
287        cron.stop();
288
289    }
290
291    /**
292     * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
293     *
294     * @param dryrun if this is a dry run
295     * @throws Exception thrown if failed to materialize actions
296     */
297    protected String materializeActions(boolean dryrun) throws Exception {
298
299        Configuration jobConf = null;
300        try {
301            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
302        }
303        catch (IOException ioe) {
304            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
305            throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
306        }
307
308        String jobXml = coordJob.getJobXml();
309        Element eJob = XmlUtils.parseXml(jobXml);
310        TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
311        int frequency = Integer.valueOf(coordJob.getFrequency());
312        TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
313        TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
314        Calendar start = Calendar.getInstance(appTz);
315        start.setTime(startMatdTime);
316        DateUtils.moveToEnd(start, endOfFlag);
317        Calendar end = Calendar.getInstance(appTz);
318        end.setTime(endMatdTime);
319        lastActionNumber = coordJob.getLastActionNumber();
320        LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
321                + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
322                + freqTU + ",\n lastActionNumber " + lastActionNumber);
323        // Keep the actual start time
324        Calendar origStart = Calendar.getInstance(appTz);
325        origStart.setTime(coordJob.getStartTimestamp());
326        // Move to the End of duration, if needed.
327        DateUtils.moveToEnd(origStart, endOfFlag);
328        // Cloning the start time to be used in loop iteration
329        Calendar effStart = (Calendar) origStart.clone();
330        // Move the time when the previous action finished
331        effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
332
333        StringBuilder actionStrings = new StringBuilder();
334        Date jobPauseTime = coordJob.getPauseTime();
335        Calendar pause = null;
336        if (jobPauseTime != null) {
337            pause = Calendar.getInstance(appTz);
338            pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
339        }
340
341        String action = null;
342        JPAService jpaService = Services.get().get(JPAService.class);
343        int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
344        int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
345        LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
346                + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
347
348        while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
349            if (pause != null && effStart.compareTo(pause) >= 0) {
350                break;
351            }
352            CoordinatorActionBean actionBean = new CoordinatorActionBean();
353            lastActionNumber++;
354
355            int timeout = coordJob.getTimeout();
356            LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber
357                    + " timeout=" + timeout + " minutes");
358            Date actualTime = new Date();
359            action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
360                    effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean);
361            actionBean.setTimeOut(timeout);
362
363            if (!dryrun) {
364                storeToDB(actionBean, action); // Storing to table
365
366            }
367            else {
368                actionStrings.append("action for new instance");
369                actionStrings.append(action);
370            }
371            // Restore the original start time
372            effStart = (Calendar) origStart.clone();
373            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
374        }
375
376        endMatdTime = new Date(effStart.getTimeInMillis());
377        if (!dryrun) {
378            return action;
379        }
380        else {
381            return actionStrings.toString();
382        }
383    }
384
385    private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
386        LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
387                + actionXml.length());
388        actionBean.setActionXml(actionXml);
389
390        insertList.add(actionBean);
391        writeActionSlaRegistration(actionXml, actionBean);
392
393        // TODO: time 100s should be configurable
394        queue(new CoordActionNotificationXCommand(actionBean), 100);
395        queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
396    }
397
398    private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
399        Element eAction = XmlUtils.parseXml(actionXml);
400        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
401        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
402                .getUser(), coordJob.getGroup(), LOG);
403        if(slaEvent != null) {
404            insertList.add(slaEvent);
405        }
406        // inserting into new table also
407        SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
408                AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
409    }
410
411    private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
412        job.setLastActionTime(endMatdTime);
413        job.setLastActionNumber(lastActionNumber);
414        // if the job endtime == action endtime, we don't need to materialize this job anymore
415        Date jobEndTime = job.getEndTime();
416
417
418        if (job.getStatus() == CoordinatorJob.Status.PREP){
419            LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
420            job.setStatus(Job.Status.RUNNING);
421        }
422        job.setPending();
423
424        if (jobEndTime.compareTo(endMatdTime) <= 0) {
425            LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus()
426                    + ", set pending to true");
427            // set doneMaterialization to true when materialization is done
428            job.setDoneMaterialization();
429        }
430        job.setStatus(StatusUtils.getStatus(job));
431        job.setNextMaterializedTime(endMatdTime);
432    }
433
434    /* (non-Javadoc)
435     * @see org.apache.oozie.command.XCommand#getKey()
436     */
437    @Override
438    public String getKey() {
439        return getName() + "_" + jobId;
440    }
441
442    /* (non-Javadoc)
443     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
444     */
445    @Override
446    public void notifyParent() throws CommandException {
447        // update bundle action only when status changes in coord job
448        if (this.coordJob.getBundleId() != null) {
449            if (!prevStatus.equals(coordJob.getStatus())) {
450                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
451                bundleStatusUpdate.call();
452            }
453        }
454    }
455}