001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.sql.Timestamp;
024import java.util.Date;
025
026import javax.persistence.Basic;
027import javax.persistence.Column;
028import javax.persistence.ColumnResult;
029import javax.persistence.Entity;
030import javax.persistence.Lob;
031import javax.persistence.NamedNativeQueries;
032import javax.persistence.NamedNativeQuery;
033import javax.persistence.NamedQueries;
034import javax.persistence.NamedQuery;
035import javax.persistence.SqlResultSetMapping;
036
037import org.apache.hadoop.io.Writable;
038import org.apache.oozie.client.CoordinatorAction;
039import org.apache.oozie.client.rest.JsonCoordinatorAction;
040import org.apache.oozie.util.DateUtils;
041import org.apache.oozie.util.WritableUtils;
042import org.apache.openjpa.persistence.jdbc.Index;
043
044@SqlResultSetMapping(
045        name = "CoordActionJobIdLmt",
046        columns = {@ColumnResult(name = "job_id"),
047            @ColumnResult(name = "min_lmt")})
048
049@Entity
050@NamedQueries({
051
052        @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
053
054        @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"),
055        // Query to update the action status, pending status and last modified time stamp of a Coordinator action
056        @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
057        // Update query for InputCheck
058        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
059        // Update query for Push-based missing dependency check
060        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime,  w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
061        // Update query for Start
062        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
063
064        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
065
066        @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
067
068        @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"),
069
070        @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.status = 'WAITING' OR a.status = 'READY')"),
071
072        // Query used by XTestcase to setup tables
073        @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
074        // Select query used only by test cases
075        @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
076
077        // Select query used by SLAService on restart
078        @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.status, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
079        // Select query used by ActionInfo command
080        @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
081        // Select Query used by Timeout command
082        @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
083        // Select query used by InputCheck command
084        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
085        // Select query used by CoordActionUpdate command
086        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
087        // Select query used by Check command
088        @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
089        // Select query used by Start command
090        @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
091
092        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
093
094        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
095
096        @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
097
098        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
099
100        @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"),
101
102        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
103
104        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"),
105
106        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
107        // Query to retrieve Coordinator actions sorted by nominal time
108        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
109        // Query to maintain backward compatibility for coord job info command
110        @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
111        // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
112        @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
113
114        // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
115        @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"),
116
117        // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
118        @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'SUSPENDED'"),
119
120        // Query to retrieve count of Coordinator actions which are pending
121        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"),
122
123        // Query to retrieve status of Coordinator actions
124        @NamedQuery(name = "GET_COORD_ACTIONS_STATUS", query = "select a.status from CoordinatorActionBean a where a.jobId = :jobId"),
125
126        @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
127
128        @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
129
130        //Used by coordinator store only
131        @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"),
132
133        @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
134
135        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
136
137        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
138        // Select query used by rerun, requires almost all columns so select * is used
139        @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
140        // Select query used by log
141        @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
142        // Select query used by rerun, requires almost all columns so select * is used
143        @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
144
145        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")})
146
147@NamedNativeQueries({
148
149    @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt")
150        })
151public class CoordinatorActionBean extends JsonCoordinatorAction implements
152        Writable {
153    @Basic
154    @Index
155    @Column(name = "job_id")
156    private String jobId;
157
158    @Basic
159    @Index
160    @Column(name = "status")
161    private String status = null;
162
163    @Basic
164    @Index
165    @Column(name = "nominal_time")
166    private java.sql.Timestamp nominalTimestamp = null;
167
168    @Basic
169    @Index
170    @Column(name = "last_modified_time")
171    private java.sql.Timestamp lastModifiedTimestamp = null;
172
173    @Basic
174    @Index
175    @Column(name = "created_time")
176    private java.sql.Timestamp createdTimestamp = null;
177
178    @Basic
179    @Index
180    @Column(name = "rerun_time")
181    private java.sql.Timestamp rerunTimestamp = null;
182
183    @Basic
184    @Index
185    @Column(name = "external_id")
186    private String externalId;
187
188    @Column(name = "sla_xml")
189    @Lob
190    private String slaXml = null;
191
192    @Basic
193    @Column(name = "pending")
194    private int pending = 0;
195
196    public CoordinatorActionBean() {
197    }
198
199    /**
200     * Serialize the coordinator bean to a data output.
201     *
202     * @param dataOutput data output.
203     * @throws IOException thrown if the coordinator bean could not be
204     *         serialized.
205     */
206    @Override
207    public void write(DataOutput dataOutput) throws IOException {
208        WritableUtils.writeStr(dataOutput, getJobId());
209        WritableUtils.writeStr(dataOutput, getType());
210        WritableUtils.writeStr(dataOutput, getId());
211        WritableUtils.writeStr(dataOutput, getCreatedConf());
212        WritableUtils.writeStr(dataOutput, getStatus().toString());
213        dataOutput.writeInt(getActionNumber());
214        WritableUtils.writeStr(dataOutput, getRunConf());
215        WritableUtils.writeStr(dataOutput, getExternalStatus());
216        WritableUtils.writeStr(dataOutput, getTrackerUri());
217        WritableUtils.writeStr(dataOutput, getConsoleUrl());
218        WritableUtils.writeStr(dataOutput, getErrorCode());
219        WritableUtils.writeStr(dataOutput, getErrorMessage());
220        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
221        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
222    }
223
224    /**
225     * Deserialize a coordinator bean from a data input.
226     *
227     * @param dataInput data input.
228     * @throws IOException thrown if the workflow bean could not be
229     *         deserialized.
230     */
231    @Override
232    public void readFields(DataInput dataInput) throws IOException {
233        setJobId(WritableUtils.readStr(dataInput));
234        setType(WritableUtils.readStr(dataInput));
235        setId(WritableUtils.readStr(dataInput));
236        setCreatedConf(WritableUtils.readStr(dataInput));
237        setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
238        setActionNumber(dataInput.readInt());
239        setRunConf(WritableUtils.readStr(dataInput));
240        setExternalStatus(WritableUtils.readStr(dataInput));
241        setTrackerUri(WritableUtils.readStr(dataInput));
242        setConsoleUrl(WritableUtils.readStr(dataInput));
243        setErrorCode(WritableUtils.readStr(dataInput));
244        setErrorMessage(WritableUtils.readStr(dataInput));
245        long d = dataInput.readLong();
246        if (d != -1) {
247            setCreatedTime(new Date(d));
248        }
249        d = dataInput.readLong();
250        if (d != -1) {
251            setLastModifiedTime(new Date(d));
252        }
253    }
254
255    @Override
256    public String getJobId() {
257        return this.jobId;
258    }
259
260    @Override
261    public void setJobId(String id) {
262        super.setJobId(id);
263        this.jobId = id;
264    }
265
266    @Override
267    public Status getStatus() {
268        return Status.valueOf(status);
269    }
270
271    /**
272     * Return the status in string
273     * @return
274     */
275    public String getStatusStr() {
276        return status;
277    }
278
279    @Override
280    public void setStatus(Status status) {
281        super.setStatus(status);
282        this.status = status.toString();
283    }
284
285    @Override
286    public void setCreatedTime(Date createdTime) {
287        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
288        super.setCreatedTime(createdTime);
289    }
290
291    public void setRerunTime(Date rerunTime) {
292        this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
293    }
294
295    @Override
296    public void setNominalTime(Date nominalTime) {
297        this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
298        super.setNominalTime(nominalTime);
299    }
300
301    @Override
302    public void setLastModifiedTime(Date lastModifiedTime) {
303        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
304        super.setLastModifiedTime(lastModifiedTime);
305    }
306
307    @Override
308    public Date getCreatedTime() {
309        return DateUtils.toDate(createdTimestamp);
310    }
311
312    public Timestamp getCreatedTimestamp() {
313        return createdTimestamp;
314    }
315
316    public Date getRerunTime() {
317        return DateUtils.toDate(rerunTimestamp);
318    }
319
320    public Timestamp getRerunTimestamp() {
321        return rerunTimestamp;
322    }
323
324    @Override
325    public Date getLastModifiedTime() {
326        return DateUtils.toDate(lastModifiedTimestamp);
327    }
328
329    public Timestamp getLastModifiedTimestamp() {
330        return lastModifiedTimestamp;
331    }
332
333    @Override
334    public Date getNominalTime() {
335        return DateUtils.toDate(nominalTimestamp);
336    }
337
338    public Timestamp getNominalTimestamp() {
339        return nominalTimestamp;
340    }
341
342    @Override
343    public String getExternalId() {
344        return externalId;
345    }
346
347    @Override
348    public void setExternalId(String externalId) {
349        super.setExternalId(externalId);
350        this.externalId = externalId;
351    }
352
353    public String getSlaXml() {
354        return slaXml;
355    }
356
357    public void setSlaXml(String slaXml) {
358        this.slaXml = slaXml;
359    }
360
361    /**
362     * @return true if in terminal status
363     */
364    public boolean isTerminalStatus() {
365        boolean isTerminal = true;
366        switch (getStatus()) {
367            case WAITING:
368            case READY:
369            case SUBMITTED:
370            case RUNNING:
371            case SUSPENDED:
372                isTerminal = false;
373                break;
374            default:
375                isTerminal = true;
376                break;
377        }
378        return isTerminal;
379    }
380
381    /**
382     * Return if the action is complete with failure.
383     *
384     * @return if the action is complete with failure.
385     */
386    public boolean isTerminalWithFailure() {
387        boolean result = false;
388        switch (getStatus()) {
389            case FAILED:
390            case KILLED:
391            case TIMEDOUT:
392                result = true;
393        }
394        return result;
395    }
396
397    /**
398     * Set some actions are in progress for particular coordinator action.
399     *
400     * @param pending set pending to true
401     */
402    public void setPending(int pending) {
403        this.pending = pending;
404    }
405
406    /**
407     * increment pending and return it
408     *
409     * @return pending
410     */
411    public int incrementAndGetPending() {
412        this.pending++;
413        return pending;
414    }
415
416    /**
417     * decrement pending and return it
418     *
419     * @return pending
420     */
421    public int decrementAndGetPending() {
422        this.pending = Math.max(this.pending - 1, 0);
423        return pending;
424    }
425
426    /**
427     * Get some actions are in progress for particular bundle action.
428     *
429     * @return pending
430     */
431    public int getPending() {
432        return this.pending;
433    }
434
435    /**
436     * Return if the action is pending.
437     *
438     * @return if the action is pending.
439     */
440    public boolean isPending() {
441        return pending > 0 ? true : false;
442    }
443}