001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.store;
019
020import java.sql.Connection;
021import java.sql.SQLException;
022import java.sql.Timestamp;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.Callable;
028
029import javax.persistence.EntityManager;
030import javax.persistence.Query;
031
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.WorkflowActionBean;
034import org.apache.oozie.WorkflowJobBean;
035import org.apache.oozie.WorkflowsInfo;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.WorkflowJob.Status;
038import org.apache.oozie.service.InstrumentationService;
039import org.apache.oozie.service.SchemaService;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.service.SchemaService.SchemaName;
042import org.apache.oozie.util.Instrumentation;
043import org.apache.oozie.util.ParamChecker;
044import org.apache.oozie.util.XLog;
045import org.apache.oozie.workflow.WorkflowException;
046import org.apache.openjpa.persistence.OpenJPAEntityManager;
047import org.apache.openjpa.persistence.OpenJPAPersistence;
048import org.apache.openjpa.persistence.OpenJPAQuery;
049import org.apache.openjpa.persistence.jdbc.FetchDirection;
050import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
051import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
052import org.apache.openjpa.persistence.jdbc.ResultSetType;
053
054/**
055 * DB Implementation of Workflow Store
056 */
057public class WorkflowStore extends Store {
058    private Connection conn;
059    private EntityManager entityManager;
060    private boolean selectForUpdate;
061    private static final String INSTR_GROUP = "db";
062    public static final int LOCK_TIMEOUT = 50000;
063    private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
064            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
065    private static final String countStr = "Select count(w) from WorkflowJobBean w";
066
067    public WorkflowStore() {
068    }
069
070    public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
071        super();
072        conn = ParamChecker.notNull(connection, "conn");
073        entityManager = getEntityManager();
074        this.selectForUpdate = selectForUpdate;
075    }
076
077    public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
078        super(store);
079        conn = ParamChecker.notNull(connection, "conn");
080        entityManager = getEntityManager();
081        this.selectForUpdate = selectForUpdate;
082    }
083
084    public WorkflowStore(boolean selectForUpdate) throws StoreException {
085        super();
086        entityManager = getEntityManager();
087        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
088        OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
089        conn = (Connection) kem.getConnection();
090        this.selectForUpdate = selectForUpdate;
091    }
092
093    public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
094        super(store);
095        entityManager = getEntityManager();
096        this.selectForUpdate = selectForUpdate;
097    }
098
099    /**
100     * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
101     *
102     * @param workflow workflow bean
103     * @throws StoreException
104     */
105
106    public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
107        ParamChecker.notNull(workflow, "workflow");
108
109        doOperation("insertWorkflow", new Callable<Void>() {
110            public Void call() throws SQLException, StoreException, WorkflowException {
111                entityManager.persist(workflow);
112                return null;
113            }
114        });
115    }
116
117    /**
118     * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
119     * depending on the locking parameter.
120     *
121     * @param id Workflow ID
122     * @param locking true if Workflow is to be locked
123     * @return WorkflowJobBean
124     * @throws StoreException
125     */
126    public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
127        ParamChecker.notEmpty(id, "WorkflowID");
128        WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
129            public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
130                WorkflowJobBean wfBean = null;
131                wfBean = getWorkflowOnly(id, locking);
132                if (wfBean == null) {
133                    throw new StoreException(ErrorCode.E0604, id);
134                }
135                /*
136                 * WorkflowInstance wfInstance; //krishna and next line
137                 * wfInstance = workflowLib.get(id); wfInstance =
138                 * wfBean.get(wfBean.getWfInstance());
139                 * wfBean.setWorkflowInstance(wfInstance);
140                 * wfBean.setWfInstance(wfInstance);
141                 */
142                return wfBean;
143            }
144        });
145        return wfBean;
146    }
147
148    /**
149     * Get the number of Workflows with the given status.
150     *
151     * @param status Workflow Status.
152     * @return number of Workflows with given status.
153     * @throws StoreException
154     */
155    public int getWorkflowCountWithStatus(final String status) throws StoreException {
156        ParamChecker.notEmpty(status, "status");
157        Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
158            public Integer call() throws SQLException {
159                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
160                q.setParameter("status", status);
161                Long count = (Long) q.getSingleResult();
162                return Integer.valueOf(count.intValue());
163            }
164        });
165        return cnt.intValue();
166    }
167
168    /**
169     * Get the number of Workflows with the given status which was modified in given time limit.
170     *
171     * @param status Workflow Status.
172     * @param secs No. of seconds within which the workflow got modified.
173     * @return number of Workflows modified within given time with given status.
174     * @throws StoreException
175     */
176    public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
177        ParamChecker.notEmpty(status, "status");
178        ParamChecker.notEmpty(status, "secs");
179        Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
180            public Integer call() throws SQLException {
181                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
182                Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
183                q.setParameter("status", status);
184                q.setParameter("lastModTime", ts);
185                Long count = (Long) q.getSingleResult();
186                return Integer.valueOf(count.intValue());
187            }
188        });
189        return cnt.intValue();
190    }
191
192    /**
193     * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
194     *
195     * @param wfBean Workflow Bean
196     * @throws StoreException If Workflow doesn't exist
197     */
198    public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
199        ParamChecker.notNull(wfBean, "WorkflowJobBean");
200        doOperation("updateWorkflow", new Callable<Void>() {
201            public Void call() throws SQLException, StoreException, WorkflowException {
202                Query q = entityManager.createNamedQuery("UPDATE_WORKFLOW");
203                q.setParameter("id", wfBean.getId());
204                setWFQueryParameters(wfBean, q);
205                q.executeUpdate();
206                return null;
207            }
208        });
209    }
210
211    /**
212     * Create a new Action record in the ACTIONS table with the given Bean.
213     *
214     * @param action WorkflowActionBean
215     * @throws StoreException If the action is already present
216     */
217    public void insertAction(final WorkflowActionBean action) throws StoreException {
218        ParamChecker.notNull(action, "WorkflowActionBean");
219        doOperation("insertAction", new Callable<Void>() {
220            public Void call() throws SQLException, StoreException, WorkflowException {
221                entityManager.persist(action);
222                return null;
223            }
224        });
225    }
226
227    /**
228     * Load the action data and returns a bean.
229     *
230     * @param id Action Id
231     * @param locking true if the action is to be locked
232     * @return Action Bean
233     * @throws StoreException If action doesn't exist
234     */
235    public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
236        ParamChecker.notEmpty(id, "ActionID");
237        WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
238            public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
239                    InterruptedException {
240                Query q = entityManager.createNamedQuery("GET_ACTION");
241                /*
242                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
243                 * FetchPlan fetch = oq.getFetchPlan();
244                 * fetch.setReadLockMode(LockModeType.WRITE);
245                 * fetch.setLockTimeout(1000); // 1 seconds }
246                 */
247                WorkflowActionBean action = null;
248                q.setParameter("id", id);
249                List<WorkflowActionBean> actions = q.getResultList();
250                // action = (WorkflowActionBean) q.getSingleResult();
251                if (actions.size() > 0) {
252                    action = actions.get(0);
253                }
254                else {
255                    throw new StoreException(ErrorCode.E0605, id);
256                }
257
258                /*
259                 * if (locking) return action; else
260                 */
261                // return action;
262                return getBeanForRunningAction(action);
263            }
264        });
265        return action;
266    }
267
268    /**
269     * Update the given action bean to DB.
270     *
271     * @param action Action Bean
272     * @throws StoreException if action doesn't exist
273     */
274    public void updateAction(final WorkflowActionBean action) throws StoreException {
275        ParamChecker.notNull(action, "WorkflowActionBean");
276        doOperation("updateAction", new Callable<Void>() {
277            public Void call() throws SQLException, StoreException, WorkflowException {
278                Query q = entityManager.createNamedQuery("UPDATE_ACTION");
279                q.setParameter("id", action.getId());
280                setActionQueryParameters(action, q);
281                q.executeUpdate();
282                return null;
283            }
284        });
285    }
286
287    /**
288     * Delete the Action with given id.
289     *
290     * @param id Action ID
291     * @throws StoreException if Action doesn't exist
292     */
293    public void deleteAction(final String id) throws StoreException {
294        ParamChecker.notEmpty(id, "ActionID");
295        doOperation("deleteAction", new Callable<Void>() {
296            public Void call() throws SQLException, StoreException, WorkflowException {
297                /*
298                 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
299                 * q.setParameter("id", id); q.executeUpdate();
300                 */
301                WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
302                if (action != null) {
303                    entityManager.remove(action);
304                }
305                return null;
306            }
307        });
308    }
309
310    /**
311     * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
312     *
313     * @param wfId Workflow ID
314     * @param locking true if Actions are to be locked
315     * @return A List of WorkflowActionBean
316     * @throws StoreException
317     */
318    public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
319            throws StoreException {
320        ParamChecker.notEmpty(wfId, "WorkflowID");
321        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
322                                                       new Callable<List<WorkflowActionBean>>() {
323                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
324                                                                   InterruptedException {
325                                                               List<WorkflowActionBean> actions;
326                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
327                                                               try {
328                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
329
330                                                                   /*
331                                                                   * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
332                                                                   * if (locking) { //
333                                                                   * q.setHint("openjpa.FetchPlan.ReadLockMode"
334                                                                   * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
335                                                                   * fetch.setReadLockMode(LockModeType.WRITE);
336                                                                   * fetch.setLockTimeout(1000); // 1 seconds }
337                                                                   */
338                                                                   q.setParameter("wfId", wfId);
339                                                                   actions = q.getResultList();
340                                                                   for (WorkflowActionBean a : actions) {
341                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
342                                                                       actionList.add(aa);
343                                                                   }
344                                                               }
345                                                               catch (IllegalStateException e) {
346                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
347                                                               }
348                                                               /*
349                                                               * if (locking) { return actions; } else {
350                                                               */
351                                                               return actionList;
352                                                               // }
353                                                           }
354                                                       });
355        return actions;
356    }
357
358    /**
359     * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
360     *
361     * @param wfId Workflow ID
362     * @param start offset for select statement
363     * @param len number of Workflow Actions to be returned
364     * @param locking true if Actions are to be locked
365     * @return A List of WorkflowActionBean
366     * @throws StoreException
367     */
368    public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
369            throws StoreException {
370        ParamChecker.notEmpty(wfId, "WorkflowID");
371        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
372                                                       new Callable<List<WorkflowActionBean>>() {
373                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
374                                                                   InterruptedException {
375                                                               List<WorkflowActionBean> actions;
376                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
377                                                               try {
378                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
379                                                                   OpenJPAQuery oq = OpenJPAPersistence.cast(q);
380                                                                   q.setParameter("wfId", wfId);
381                                                                   q.setFirstResult(start - 1);
382                                                                   q.setMaxResults(len);
383                                                                   actions = q.getResultList();
384                                                                   for (WorkflowActionBean a : actions) {
385                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
386                                                                       actionList.add(aa);
387                                                                   }
388                                                               }
389                                                               catch (IllegalStateException e) {
390                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
391                                                               }
392                                                               return actionList;
393                                                           }
394                                                       });
395        return actions;
396    }
397
398    /**
399     * Load All the actions that are pending for more than given time.
400     *
401     * @param minimumPendingAgeSecs Minimum Pending age in seconds
402     * @return List of action beans
403     * @throws StoreException
404     */
405    public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
406        List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
407            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
408                Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
409                List<WorkflowActionBean> actionList = null;
410                try {
411                    Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
412                    q.setParameter("pendingAge", ts);
413                    actionList = q.getResultList();
414                }
415                catch (IllegalStateException e) {
416                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
417                }
418                return actionList;
419            }
420        });
421        return actions;
422    }
423
424    /**
425     * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
426     *
427     * @param checkAgeSecs check age in seconds.
428     * @return List of action beans.
429     * @throws StoreException
430     */
431    public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
432        List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {
433
434            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
435                List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
436                Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
437                try {
438                    Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
439                    q.setParameter("lastCheckTime", ts);
440                    actions = q.getResultList();
441                }
442                catch (IllegalStateException e) {
443                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
444                }
445
446                return actions;
447            }
448        });
449        return actions;
450    }
451
452    /**
453     * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
454     * 
455     * @param wfId String
456     * @return List of action beans
457     * @throws StoreException
458     */
459    public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
460        List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
461                new Callable<List<WorkflowActionBean>>() {
462                    public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
463                        List<WorkflowActionBean> actionList = null;
464                        try {
465                            Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
466                            q.setParameter("wfId", wfId);
467                            actionList = q.getResultList();
468                        }
469                        catch (IllegalStateException e) {
470                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
471                        }
472
473                        return actionList;
474                    }
475                });
476        return actions;
477    }
478
479    /**
480     * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
481     * appName, status.
482     *
483     * @param filter Filter condition
484     * @param start offset for select statement
485     * @param len number of Workflows to be returned
486     * @return A list of workflows
487     * @throws StoreException
488     */
489    public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
490            throws StoreException {
491
492        WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
493            @SuppressWarnings("unchecked")
494            public WorkflowsInfo call() throws SQLException, StoreException {
495
496                List<String> orArray = new ArrayList<String>();
497                List<String> colArray = new ArrayList<String>();
498                List<String> valArray = new ArrayList<String>();
499                StringBuilder sb = new StringBuilder("");
500                boolean isStatus = false;
501                boolean isGroup = false;
502                boolean isAppName = false;
503                boolean isUser = false;
504                boolean isEnabled = false;
505                int index = 0;
506                for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
507                    String colName = null;
508                    String colVar = null;
509                    if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
510                        List<String> values = filter.get(OozieClient.FILTER_GROUP);
511                        colName = "group";
512                        for (int i = 0; i < values.size(); i++) {
513                            colVar = "group";
514                            colVar = colVar + index;
515                            if (!isEnabled && !isGroup) {
516                                sb.append(seletStr).append(" where w.group IN (:group" + index);
517                                isGroup = true;
518                                isEnabled = true;
519                            }
520                            else {
521                                if (isEnabled && !isGroup) {
522                                    sb.append(" and w.group IN (:group" + index);
523                                    isGroup = true;
524                                }
525                                else {
526                                    if (isGroup) {
527                                        sb.append(", :group" + index);
528                                    }
529                                }
530                            }
531                            if (i == values.size() - 1) {
532                                sb.append(")");
533                            }
534                            index++;
535                            valArray.add(values.get(i));
536                            orArray.add(colName);
537                            colArray.add(colVar);
538                        }
539                    }
540                    else {
541                        if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
542                            List<String> values = filter.get(OozieClient.FILTER_STATUS);
543                            colName = "status";
544                            for (int i = 0; i < values.size(); i++) {
545                                colVar = "status";
546                                colVar = colVar + index;
547                                if (!isEnabled && !isStatus) {
548                                    sb.append(seletStr).append(" where w.status IN (:status" + index);
549                                    isStatus = true;
550                                    isEnabled = true;
551                                }
552                                else {
553                                    if (isEnabled && !isStatus) {
554                                        sb.append(" and w.status IN (:status" + index);
555                                        isStatus = true;
556                                    }
557                                    else {
558                                        if (isStatus) {
559                                            sb.append(", :status" + index);
560                                        }
561                                    }
562                                }
563                                if (i == values.size() - 1) {
564                                    sb.append(")");
565                                }
566                                index++;
567                                valArray.add(values.get(i));
568                                orArray.add(colName);
569                                colArray.add(colVar);
570                            }
571                        }
572                        else {
573                            if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
574                                List<String> values = filter.get(OozieClient.FILTER_NAME);
575                                colName = "appName";
576                                for (int i = 0; i < values.size(); i++) {
577                                    colVar = "appName";
578                                    colVar = colVar + index;
579                                    if (!isEnabled && !isAppName) {
580                                        sb.append(seletStr).append(" where w.appName IN (:appName" + index);
581                                        isAppName = true;
582                                        isEnabled = true;
583                                    }
584                                    else {
585                                        if (isEnabled && !isAppName) {
586                                            sb.append(" and w.appName IN (:appName" + index);
587                                            isAppName = true;
588                                        }
589                                        else {
590                                            if (isAppName) {
591                                                sb.append(", :appName" + index);
592                                            }
593                                        }
594                                    }
595                                    if (i == values.size() - 1) {
596                                        sb.append(")");
597                                    }
598                                    index++;
599                                    valArray.add(values.get(i));
600                                    orArray.add(colName);
601                                    colArray.add(colVar);
602                                }
603                            }
604                            else {
605                                if (entry.getKey().equals(OozieClient.FILTER_USER)) {
606                                    List<String> values = filter.get(OozieClient.FILTER_USER);
607                                    colName = "user";
608                                    for (int i = 0; i < values.size(); i++) {
609                                        colVar = "user";
610                                        colVar = colVar + index;
611                                        if (!isEnabled && !isUser) {
612                                            sb.append(seletStr).append(" where w.user IN (:user" + index);
613                                            isUser = true;
614                                            isEnabled = true;
615                                        }
616                                        else {
617                                            if (isEnabled && !isUser) {
618                                                sb.append(" and w.user IN (:user" + index);
619                                                isUser = true;
620                                            }
621                                            else {
622                                                if (isUser) {
623                                                    sb.append(", :user" + index);
624                                                }
625                                            }
626                                        }
627                                        if (i == values.size() - 1) {
628                                            sb.append(")");
629                                        }
630                                        index++;
631                                        valArray.add(values.get(i));
632                                        orArray.add(colName);
633                                        colArray.add(colVar);
634                                    }
635                                }
636                            }
637                        }
638                    }
639                }
640
641                int realLen = 0;
642
643                Query q = null;
644                Query qTotal = null;
645                if (orArray.size() == 0) {
646                    q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
647                    q.setFirstResult(start - 1);
648                    q.setMaxResults(len);
649                    qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
650                }
651                else {
652                    if (orArray.size() > 0) {
653                        StringBuilder sbTotal = new StringBuilder(sb);
654                        sb.append(" order by w.startTimestamp desc ");
655                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
656                        q = entityManager.createQuery(sb.toString());
657                        q.setFirstResult(start - 1);
658                        q.setMaxResults(len);
659                        qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
660                        for (int i = 0; i < orArray.size(); i++) {
661                            q.setParameter(colArray.get(i), valArray.get(i));
662                            qTotal.setParameter(colArray.get(i), valArray.get(i));
663                        }
664                    }
665                }
666
667                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
668                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
669                fetch.setFetchBatchSize(20);
670                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
671                fetch.setFetchDirection(FetchDirection.FORWARD);
672                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
673                List<?> resultList = q.getResultList();
674                List<Object[]> objectArrList = (List<Object[]>) resultList;
675                List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
676
677                for (Object[] arr : objectArrList) {
678                    WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
679                    wfBeansList.add(ww);
680                }
681
682                realLen = ((Long) qTotal.getSingleResult()).intValue();
683
684                return new WorkflowsInfo(wfBeansList, start, len, realLen);
685            }
686        });
687        return workFlowsInfo;
688
689    }
690
691    /**
692     * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
693     *
694     * @param id Workflow Id
695     * @return Workflow Bean
696     * @throws StoreException If Workflow doesn't exist
697     */
698    public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
699        ParamChecker.notEmpty(id, "WorkflowID");
700        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
701            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
702                WorkflowJobBean wfBean = null;
703                wfBean = getWorkflowforInfo(id, false);
704                if (wfBean == null) {
705                    throw new StoreException(ErrorCode.E0604, id);
706                }
707                else {
708                    wfBean.setActions(getActionsForWorkflow(id, false));
709                }
710                return wfBean;
711            }
712        });
713        return wfBean;
714    }
715
716    /**
717     * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
718     *
719     * @param id Workflow Id
720     * @param start offset for select statement for actions
721     * @param len number of Workflow Actions to be returned
722     * @return Workflow Bean
723     * @throws StoreException If Workflow doesn't exist
724     */
725    public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
726        ParamChecker.notEmpty(id, "WorkflowID");
727        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
728            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
729                WorkflowJobBean wfBean = null;
730                wfBean = getWorkflowforInfo(id, false);
731                if (wfBean == null) {
732                    throw new StoreException(ErrorCode.E0604, id);
733                }
734                else {
735                    wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
736                }
737                return wfBean;
738            }
739        });
740        return wfBean;
741    }
742
743    /**
744     * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
745     *
746     * @param externalId external ID
747     * @return Workflow ID
748     * @throws StoreException if there is no job with external ID
749     */
750    public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
751        ParamChecker.notEmpty(externalId, "externalId");
752        String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
753            public String call() throws SQLException, StoreException {
754                String id = "";
755                Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
756                q.setParameter("externalId", externalId);
757                List<String> w = q.getResultList();
758                if (w.size() == 0) {
759                    id = "";
760                }
761                else {
762                    int index = w.size() - 1;
763                    id = w.get(index);
764                }
765                return id;
766            }
767        });
768        return wfId;
769    }
770
771    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
772
773    /**
774     * Purge the Workflows Completed older than given days.
775     *
776     * @param olderThanDays number of days for which to preserve the workflows
777     * @throws StoreException
778     */
779    public void purge(final long olderThanDays, final int limit) throws StoreException {
780        doOperation("purge", new Callable<Void>() {
781            public Void call() throws SQLException, StoreException, WorkflowException {
782                Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
783                Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
784                q.setParameter("endTime", maxEndTime);
785                q.setMaxResults(limit);
786                List<WorkflowJobBean> workflows = q.getResultList();
787                int actionDeleted = 0;
788                if (workflows.size() != 0) {
789                    for (WorkflowJobBean w : workflows) {
790                        String wfId = w.getId();
791                        entityManager.remove(w);
792                        Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
793                        g.setParameter("wfId", wfId);
794                        actionDeleted += g.executeUpdate();
795                    }
796                }
797                XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
798                return null;
799            }
800        });
801    }
802
803    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
804        try {
805            Instrumentation.Cron cron = new Instrumentation.Cron();
806            cron.start();
807            V retVal;
808            try {
809                retVal = command.call();
810            }
811            finally {
812                cron.stop();
813            }
814            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
815            return retVal;
816        }
817        catch (StoreException ex) {
818            throw ex;
819        }
820        catch (SQLException ex) {
821            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
822        }
823        catch (Exception e) {
824            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
825        }
826    }
827
828    private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
829            InterruptedException, StoreException {
830        WorkflowJobBean wfBean = null;
831        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
832        /*
833         * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
834         * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
835         * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
836         * fetch.setLockTimeout(-1); // unlimited }
837         */
838        q.setParameter("id", id);
839        List<WorkflowJobBean> w = q.getResultList();
840        if (w.size() > 0) {
841            wfBean = w.get(0);
842        }
843        return wfBean;
844        // return getBeanForRunningWorkflow(wfBean);
845    }
846
847    private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
848            InterruptedException, StoreException {
849        WorkflowJobBean wfBean = null;
850        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
851        q.setParameter("id", id);
852        List<WorkflowJobBean> w = q.getResultList();
853        if (w.size() > 0) {
854            wfBean = w.get(0);
855            return getBeanForRunningWorkflow(wfBean);
856        }
857        return null;
858    }
859
860    private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
861        WorkflowJobBean wfBean = new WorkflowJobBean();
862        wfBean.setId(w.getId());
863        wfBean.setAppName(w.getAppName());
864        wfBean.setAppPath(w.getAppPath());
865        wfBean.setConf(w.getConf());
866        wfBean.setGroup(w.getGroup());
867        wfBean.setRun(w.getRun());
868        wfBean.setUser(w.getUser());
869        wfBean.setCreatedTime(w.getCreatedTime());
870        wfBean.setEndTime(w.getEndTime());
871        wfBean.setExternalId(w.getExternalId());
872        wfBean.setLastModifiedTime(w.getLastModifiedTime());
873        wfBean.setLogToken(w.getLogToken());
874        wfBean.setProtoActionConf(w.getProtoActionConf());
875        wfBean.setSlaXml(w.getSlaXml());
876        wfBean.setStartTime(w.getStartTime());
877        wfBean.setStatus(w.getStatus());
878        wfBean.setWfInstance(w.getWfInstance());
879        return wfBean;
880    }
881
882    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
883
884        WorkflowJobBean wfBean = new WorkflowJobBean();
885        wfBean.setId((String) arr[0]);
886        if (arr[1] != null) {
887            wfBean.setAppName((String) arr[1]);
888        }
889        if (arr[2] != null) {
890            wfBean.setStatus(Status.valueOf((String) arr[2]));
891        }
892        if (arr[3] != null) {
893            wfBean.setRun((Integer) arr[3]);
894        }
895        if (arr[4] != null) {
896            wfBean.setUser((String) arr[4]);
897        }
898        if (arr[5] != null) {
899            wfBean.setGroup((String) arr[5]);
900        }
901        if (arr[6] != null) {
902            wfBean.setCreatedTime((Timestamp) arr[6]);
903        }
904        if (arr[7] != null) {
905            wfBean.setStartTime((Timestamp) arr[7]);
906        }
907        if (arr[8] != null) {
908            wfBean.setLastModifiedTime((Timestamp) arr[8]);
909        }
910        if (arr[9] != null) {
911            wfBean.setEndTime((Timestamp) arr[9]);
912        }
913        return wfBean;
914    }
915
916    private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
917        if (a != null) {
918            WorkflowActionBean action = new WorkflowActionBean();
919            action.setId(a.getId());
920            action.setConf(a.getConf());
921            action.setConsoleUrl(a.getConsoleUrl());
922            action.setData(a.getData());
923            action.setStats(a.getStats());
924            action.setExternalChildIDs(a.getExternalChildIDs());
925            action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
926            action.setExternalId(a.getExternalId());
927            action.setExternalStatus(a.getExternalStatus());
928            action.setName(a.getName());
929            action.setCred(a.getCred());
930            action.setRetries(a.getRetries());
931            action.setTrackerUri(a.getTrackerUri());
932            action.setTransition(a.getTransition());
933            action.setType(a.getType());
934            action.setEndTime(a.getEndTime());
935            action.setExecutionPath(a.getExecutionPath());
936            action.setLastCheckTime(a.getLastCheckTime());
937            action.setLogToken(a.getLogToken());
938            if (a.getPending() == true) {
939                action.setPending();
940            }
941            action.setPendingAge(a.getPendingAge());
942            action.setSignalValue(a.getSignalValue());
943            action.setSlaXml(a.getSlaXml());
944            action.setStartTime(a.getStartTime());
945            action.setStatus(a.getStatus());
946            action.setJobId(a.getWfId());
947            action.setUserRetryCount(a.getUserRetryCount());
948            action.setUserRetryInterval(a.getUserRetryInterval());
949            action.setUserRetryMax(a.getUserRetryMax());
950            return action;
951        }
952        return null;
953    }
954
955    private void setWFQueryParameters(WorkflowJobBean wfBean, Query q) {
956        q.setParameter("appName", wfBean.getAppName());
957        q.setParameter("appPath", wfBean.getAppPath());
958        q.setParameter("conf", wfBean.getConf());
959        q.setParameter("groupName", wfBean.getGroup());
960        q.setParameter("run", wfBean.getRun());
961        q.setParameter("user", wfBean.getUser());
962        q.setParameter("createdTime", wfBean.getCreatedTimestamp());
963        q.setParameter("endTime", wfBean.getEndTimestamp());
964        q.setParameter("externalId", wfBean.getExternalId());
965        q.setParameter("lastModTime", new Date());
966        q.setParameter("logToken", wfBean.getLogToken());
967        q.setParameter("protoActionConf", wfBean.getProtoActionConf());
968        q.setParameter("slaXml", wfBean.getSlaXml());
969        q.setParameter("startTime", wfBean.getStartTimestamp());
970        q.setParameter("status", wfBean.getStatusStr());
971        q.setParameter("wfInstance", wfBean.getWfInstance());
972    }
973
974    private void setActionQueryParameters(WorkflowActionBean aBean, Query q) {
975        q.setParameter("conf", aBean.getConf());
976        q.setParameter("consoleUrl", aBean.getConsoleUrl());
977        q.setParameter("data", aBean.getData());
978        q.setParameter("stats", aBean.getStats());
979        q.setParameter("externalChildIDs", aBean.getExternalChildIDs());
980        q.setParameter("errorCode", aBean.getErrorCode());
981        q.setParameter("errorMessage", aBean.getErrorMessage());
982        q.setParameter("externalId", aBean.getExternalId());
983        q.setParameter("externalStatus", aBean.getExternalStatus());
984        q.setParameter("name", aBean.getName());
985        q.setParameter("cred", aBean.getCred());
986        q.setParameter("retries", aBean.getRetries());
987        q.setParameter("trackerUri", aBean.getTrackerUri());
988        q.setParameter("transition", aBean.getTransition());
989        q.setParameter("type", aBean.getType());
990        q.setParameter("endTime", aBean.getEndTimestamp());
991        q.setParameter("executionPath", aBean.getExecutionPath());
992        q.setParameter("lastCheckTime", aBean.getLastCheckTimestamp());
993        q.setParameter("logToken", aBean.getLogToken());
994        q.setParameter("pending", aBean.isPending() ? 1 : 0);
995        q.setParameter("pendingAge", aBean.getPendingAgeTimestamp());
996        q.setParameter("signalValue", aBean.getSignalValue());
997        q.setParameter("slaXml", aBean.getSlaXml());
998        q.setParameter("startTime", aBean.getStartTimestamp());
999        q.setParameter("status", aBean.getStatusStr());
1000        q.setParameter("wfId", aBean.getWfId());
1001    }
1002}