001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.sql.Timestamp;
024import java.util.Date;
025import java.util.Properties;
026
027import javax.persistence.Basic;
028import javax.persistence.Column;
029import javax.persistence.Entity;
030import javax.persistence.Lob;
031import javax.persistence.NamedQueries;
032import javax.persistence.NamedQuery;
033import javax.persistence.Transient;
034
035import org.apache.hadoop.io.Writable;
036import org.apache.oozie.client.WorkflowAction;
037import org.apache.oozie.client.rest.JsonWorkflowAction;
038import org.apache.oozie.util.DateUtils;
039import org.apache.oozie.util.ParamChecker;
040import org.apache.oozie.util.PropertiesUtils;
041import org.apache.oozie.util.WritableUtils;
042import org.apache.openjpa.persistence.jdbc.Index;
043
044/**
045 * Bean that contains all the information to start an action for a workflow node.
046 */
047@Entity
048@NamedQueries({
049
050    @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051
052    @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053
054    @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055
056    @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057
058    @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059
060    @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061
062    @NamedQuery(name = "GET_ACTION_FOR_SLA", query = "select a.id, a.status, a.startTimestamp, a.endTimestamp from WorkflowActionBean a where a.id = :id"),
063
064    @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065
066    @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
067
068    @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
069
070    @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
071
072    @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
073
074public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
075
076    @Basic
077    @Index
078    @Column(name = "wf_id")
079    private String wfId = null;
080
081    @Basic
082    @Index
083    @Column(name = "status")
084    private String status = WorkflowAction.Status.PREP.toString();
085
086    @Basic
087    @Column(name = "last_check_time")
088    private java.sql.Timestamp lastCheckTimestamp;
089
090    @Basic
091    @Column(name = "end_time")
092    private java.sql.Timestamp endTimestamp = null;
093
094    @Basic
095    @Column(name = "start_time")
096    private java.sql.Timestamp startTimestamp = null;
097
098    @Basic
099    @Column(name = "execution_path", length = 1024)
100    private String executionPath = null;
101
102    @Basic
103    @Column(name = "pending")
104    private int pending = 0;
105
106    // @Temporal(TemporalType.TIME)
107    // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
108    @Basic
109    @Index
110    @Column(name = "pending_age")
111    private java.sql.Timestamp pendingAgeTimestamp = null;
112
113    @Basic
114    @Column(name = "signal_value")
115    private String signalValue = null;
116
117    @Basic
118    @Column(name = "log_token")
119    private String logToken = null;
120
121    @Transient
122    private Date pendingAge;
123
124    @Column(name = "sla_xml")
125    @Lob
126    private String slaXml = null;
127
128    /**
129     * Default constructor.
130     */
131    public WorkflowActionBean() {
132    }
133
134    /**
135     * Serialize the action bean to a data output.
136     *
137     * @param dataOutput data output.
138     * @throws IOException thrown if the action bean could not be serialized.
139     */
140
141    public void write(DataOutput dataOutput) throws IOException {
142        WritableUtils.writeStr(dataOutput, getId());
143        WritableUtils.writeStr(dataOutput, getName());
144        WritableUtils.writeStr(dataOutput, getCred());
145        WritableUtils.writeStr(dataOutput, getType());
146        WritableUtils.writeStr(dataOutput, getConf());
147        WritableUtils.writeStr(dataOutput, getStatusStr());
148        dataOutput.writeInt(getRetries());
149        dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
150        dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
151        dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
152        WritableUtils.writeStr(dataOutput, getTransition());
153        WritableUtils.writeStr(dataOutput, getData());
154        WritableUtils.writeStr(dataOutput, getStats());
155        WritableUtils.writeStr(dataOutput, getExternalChildIDs());
156        WritableUtils.writeStr(dataOutput, getExternalId());
157        WritableUtils.writeStr(dataOutput, getExternalStatus());
158        WritableUtils.writeStr(dataOutput, getTrackerUri());
159        WritableUtils.writeStr(dataOutput, getConsoleUrl());
160        WritableUtils.writeStr(dataOutput, getErrorCode());
161        WritableUtils.writeStr(dataOutput, getErrorMessage());
162        WritableUtils.writeStr(dataOutput, wfId);
163        WritableUtils.writeStr(dataOutput, executionPath);
164        dataOutput.writeInt(pending);
165        dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
166        WritableUtils.writeStr(dataOutput, signalValue);
167        WritableUtils.writeStr(dataOutput, logToken);
168        dataOutput.writeInt(getUserRetryCount());
169        dataOutput.writeInt(getUserRetryInterval());
170        dataOutput.writeInt(getUserRetryMax());
171    }
172
173    /**
174     * Deserialize an action bean from a data input.
175     *
176     * @param dataInput data input.
177     * @throws IOException thrown if the action bean could not be deserialized.
178     */
179    public void readFields(DataInput dataInput) throws IOException {
180        setId(WritableUtils.readStr(dataInput));
181        setName(WritableUtils.readStr(dataInput));
182        setCred(WritableUtils.readStr(dataInput));
183        setType(WritableUtils.readStr(dataInput));
184        setConf(WritableUtils.readStr(dataInput));
185        setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
186        setRetries(dataInput.readInt());
187        long d = dataInput.readLong();
188        if (d != -1) {
189            setStartTime(new Date(d));
190        }
191        d = dataInput.readLong();
192        if (d != -1) {
193            setEndTime(new Date(d));
194        }
195        d = dataInput.readLong();
196        if (d != -1) {
197            setLastCheckTime(new Date(d));
198        }
199        setTransition(WritableUtils.readStr(dataInput));
200        setData(WritableUtils.readStr(dataInput));
201        setStats(WritableUtils.readStr(dataInput));
202        setExternalChildIDs(WritableUtils.readStr(dataInput));
203        setExternalId(WritableUtils.readStr(dataInput));
204        setExternalStatus(WritableUtils.readStr(dataInput));
205        setTrackerUri(WritableUtils.readStr(dataInput));
206        setConsoleUrl(WritableUtils.readStr(dataInput));
207        setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
208        wfId = WritableUtils.readStr(dataInput);
209        executionPath = WritableUtils.readStr(dataInput);
210        pending = dataInput.readInt();
211        d = dataInput.readLong();
212        if (d != -1) {
213            pendingAge = new Date(d);
214            pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
215        }
216        signalValue = WritableUtils.readStr(dataInput);
217        logToken = WritableUtils.readStr(dataInput);
218        setUserRetryCount(dataInput.readInt());
219        setUserRetryInterval(dataInput.readInt());
220        setUserRetryMax(dataInput.readInt());
221    }
222
223    /**
224     * Return whether workflow action in terminal state or not
225     *
226     * @return
227     */
228    public boolean inTerminalState() {
229        boolean isTerminalState = false;
230        switch (WorkflowAction.Status.valueOf(status)) {
231            case ERROR:
232            case FAILED:
233            case KILLED:
234            case OK:
235                isTerminalState = true;
236                break;
237            default:
238                break;
239        }
240        return isTerminalState;
241    }
242
243    /**
244     * Return if the action execution is complete.
245     *
246     * @return if the action start is complete.
247     */
248    public boolean isExecutionComplete() {
249        return getStatus() == WorkflowAction.Status.DONE;
250    }
251
252    /**
253     * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
254     * END_MANUAL.
255     *
256     * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
257     *         END_MANUAL
258     */
259    public boolean isRetryOrManual() {
260        return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
261                || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
262    }
263
264    /**
265     * Return true if the action is USER_RETRY
266     *
267     * @return boolean true if status is USER_RETRY
268     */
269    public boolean isUserRetry() {
270        return (getStatus() == WorkflowAction.Status.USER_RETRY);
271    }
272
273    /**
274     * Return if the action is complete.
275     *
276     * @return if the action is complete.
277     */
278    public boolean isComplete() {
279        return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
280                getStatus() == WorkflowAction.Status.ERROR;
281    }
282
283    /**
284     * Return if the action is complete with failure.
285     *
286     * @return if the action is complete with failure.
287     */
288    public boolean isTerminalWithFailure() {
289        boolean result = false;
290        switch (getStatus()) {
291            case FAILED:
292            case KILLED:
293            case ERROR:
294                result = true;
295        }
296        return result;
297    }
298
299    /**
300     * Set the action pending flag to true.
301     */
302    public void setPendingOnly() {
303        pending = 1;
304    }
305
306    /**
307     * Set the action as pending and the current time as pending.
308     */
309    public void setPending() {
310        pending = 1;
311        pendingAge = new Date();
312        pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
313    }
314
315    /**
316     * Set a time when the action will be pending, normally a time in the future.
317     *
318     * @param pendingAge the time when the action will be pending.
319     */
320    public void setPendingAge(Date pendingAge) {
321        this.pendingAge = pendingAge;
322        this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
323    }
324
325    /**
326     * Return the pending age of the action.
327     *
328     * @return the pending age of the action, <code>null</code> if the action is not pending.
329     */
330    public Date getPendingAge() {
331        return DateUtils.toDate(pendingAgeTimestamp);
332    }
333
334    /**
335     * Return if the action is pending.
336     *
337     * @return if the action is pending.
338     */
339    public boolean isPending() {
340        return pending == 1 ? true : false;
341    }
342
343    /**
344     * Removes the pending flag and pendingAge from the action.
345     */
346    public void resetPending() {
347        pending = 0;
348        pendingAge = null;
349        pendingAgeTimestamp = null;
350    }
351
352    /**
353     * Removes the pending flag from the action.
354     */
355    public void resetPendingOnly() {
356        pending = 0;
357    }
358
359    /**
360     * Increments the number of retries for the action.
361     */
362    public void incRetries() {
363        setRetries(getRetries() + 1);
364    }
365
366    /**
367     * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
368     *
369     * @param externalId external ID for the action.
370     * @param trackerUri tracker URI for the action.
371     * @param consoleUrl console URL for the action.
372     */
373    public void setStartData(String externalId, String trackerUri, String consoleUrl) {
374        setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
375        setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
376        setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
377        Date now = new Date();
378        if (this.startTimestamp == null) {
379            setStartTime(now);
380        }
381        setLastCheckTime(now);
382        setStatus(Status.RUNNING);
383    }
384
385    /**
386     * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
387     *
388     * @param externalStatus action external end status.
389     * @param actionData action output data, <code>null</code> if there is no action output data.
390     */
391    public void setExecutionData(String externalStatus, Properties actionData) {
392        setStatus(Status.DONE);
393        setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
394        if (actionData != null) {
395            setData(PropertiesUtils.propertiesToString(actionData));
396        }
397    }
398
399    /**
400     * Return the action statistics info.
401     *
402     * @return Json representation of the stats.
403     */
404    public String getExecutionStats() {
405        return getStats();
406    }
407
408    /**
409     * Set the action statistics info for the workflow action.
410     *
411     * @param Json representation of the stats.
412     */
413    public void setExecutionStats(String jsonStats) {
414        setStats(jsonStats);
415    }
416
417    /**
418     * Return the external child IDs.
419     *
420     * @return externalChildIDs as a string.
421     */
422    public String getExternalChildIDs() {
423        return super.getExternalChildIDs();
424    }
425
426    /**
427     * Set the external child IDs for the workflow action.
428     *
429     * @param externalChildIDs as a string.
430     */
431    public void setExternalChildIDs(String externalChildIDs) {
432        super.setExternalChildIDs(externalChildIDs);
433    }
434
435    /**
436     * Set the completion information for an action end.
437     *
438     * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
439     * Action.Status#KILLED}
440     * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
441     */
442    public void setEndData(Status status, String signalValue) {
443        if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
444            throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
445                    + status.toString() + "]");
446        }
447        if (status == Status.OK) {
448            setErrorInfo(null, null);
449        }
450        setStatus(status);
451        setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
452    }
453
454
455    /**
456     * Return the job Id.
457     *
458     * @return the job Id.
459     */
460    public String getJobId() {
461        return wfId;
462    }
463
464    /**
465     * Return the job Id.
466     *
467     * @return the job Id.
468     */
469    public String getWfId() {
470        return wfId;
471    }
472
473    /**
474     * Set the job id.
475     *
476     * @param id jobId;
477     */
478    public void setJobId(String id) {
479        this.wfId = id;
480    }
481
482    public String getSlaXml() {
483        return slaXml;
484    }
485
486    public void setSlaXml(String slaXml) {
487        this.slaXml = slaXml;
488    }
489
490    @Override
491    public void setStatus(Status val) {
492        this.status = val.toString();
493        super.setStatus(val);
494    }
495
496    public String getStatusStr() {
497        return status;
498    }
499
500    @Override
501    public Status getStatus() {
502        return Status.valueOf(this.status);
503    }
504
505    /**
506     * Return the node execution path.
507     *
508     * @return the node execution path.
509     */
510    public String getExecutionPath() {
511        return executionPath;
512    }
513
514    /**
515     * Set the node execution path.
516     *
517     * @param executionPath the node execution path.
518     */
519    public void setExecutionPath(String executionPath) {
520        this.executionPath = executionPath;
521    }
522
523    /**
524     * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
525     * OK or ERROR.
526     *
527     * @return the action signal value.
528     */
529    public String getSignalValue() {
530        return signalValue;
531    }
532
533    /**
534     * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
535     * or ERROR.
536     *
537     * @param signalValue the action signal value.
538     */
539    public void setSignalValue(String signalValue) {
540        this.signalValue = signalValue;
541    }
542
543    /**
544     * Return the job log token.
545     *
546     * @return the job log token.
547     */
548    public String getLogToken() {
549        return logToken;
550    }
551
552    /**
553     * Set the job log token.
554     *
555     * @param logToken the job log token.
556     */
557    public void setLogToken(String logToken) {
558        this.logToken = logToken;
559    }
560
561    /**
562     * Return the action last check time
563     *
564     * @return the last check time
565     */
566    public Date getLastCheckTime() {
567        return DateUtils.toDate(lastCheckTimestamp);
568    }
569
570    /**
571     * Return the action last check time
572     *
573     * @return the last check time
574     */
575    public Timestamp getLastCheckTimestamp() {
576        return lastCheckTimestamp;
577    }
578
579    /**
580     * Return the action last check time
581     *
582     * @return the last check time
583     */
584    public Timestamp getStartTimestamp() {
585        return startTimestamp;
586    }
587
588    /**
589     * Return the action last check time
590     *
591     * @return the last check time
592     */
593    public Timestamp getEndTimestamp() {
594        return endTimestamp;
595    }
596
597
598    /**
599     * Return the action last check time
600     *
601     * @return the last check time
602     */
603    public Timestamp getPendingAgeTimestamp() {
604        return pendingAgeTimestamp;
605    }
606
607    /**
608     * Sets the action last check time
609     *
610     * @param lastCheckTime the last check time to set.
611     */
612    public void setLastCheckTime(Date lastCheckTime) {
613        this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
614    }
615
616    public boolean getPending() {
617        return this.pending == 1 ? true : false;
618    }
619
620    @Override
621    public Date getStartTime() {
622        return DateUtils.toDate(startTimestamp);
623    }
624
625    @Override
626    public void setStartTime(Date startTime) {
627        super.setStartTime(startTime);
628        this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
629    }
630
631    @Override
632    public Date getEndTime() {
633        return DateUtils.toDate(endTimestamp);
634    }
635
636    @Override
637    public void setEndTime(Date endTime) {
638        super.setEndTime(endTime);
639        this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
640    }
641
642}