001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.store;
019
020import java.sql.SQLException;
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.Callable;
027
028import javax.persistence.EntityManager;
029import javax.persistence.Query;
030
031import org.apache.oozie.CoordinatorActionBean;
032import org.apache.oozie.CoordinatorJobBean;
033import org.apache.oozie.CoordinatorJobInfo;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.client.Job.Status;
036import org.apache.oozie.client.CoordinatorJob.Timeunit;
037import org.apache.oozie.service.InstrumentationService;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.util.DateUtils;
040import org.apache.oozie.util.Instrumentation;
041import org.apache.oozie.util.ParamChecker;
042import org.apache.oozie.util.XLog;
043import org.apache.oozie.workflow.WorkflowException;
044import org.apache.openjpa.persistence.OpenJPAPersistence;
045import org.apache.openjpa.persistence.OpenJPAQuery;
046import org.apache.openjpa.persistence.jdbc.FetchDirection;
047import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
048import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
049import org.apache.openjpa.persistence.jdbc.ResultSetType;
050
051/**
052 * DB Implementation of Coord Store
053 */
054public class CoordinatorStore extends Store {
055    private final XLog log = XLog.getLog(getClass());
056
057    private EntityManager entityManager;
058    private static final String INSTR_GROUP = "db";
059    public static final int LOCK_TIMEOUT = 50000;
060    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
061
062    public CoordinatorStore(boolean selectForUpdate) throws StoreException {
063        super();
064        entityManager = getEntityManager();
065    }
066
067    public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
068        super(store);
069        entityManager = getEntityManager();
070    }
071
072    /**
073     * Create a CoordJobBean. It also creates the process instance for the job.
074     *
075     * @param workflow workflow bean
076     * @throws StoreException
077     */
078
079    public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
080        ParamChecker.notNull(coordinatorJob, "coordinatorJob");
081
082        doOperation("insertCoordinatorJob", new Callable<Void>() {
083            public Void call() throws StoreException {
084                entityManager.persist(coordinatorJob);
085                return null;
086            }
087        });
088    }
089
090    /**
091     * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
092     * Workflow depending on the locking parameter.
093     *
094     * @param id Job ID
095     * @param locking Flag for Table Lock
096     * @return CoordinatorJobBean
097     * @throws StoreException
098     */
099    public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
100        ParamChecker.notEmpty(id, "CoordJobId");
101        CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
102            @SuppressWarnings("unchecked")
103            public CoordinatorJobBean call() throws StoreException {
104                Query q = entityManager.createNamedQuery("GET_COORD_JOB");
105                q.setParameter("id", id);
106                /*
107                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
108                 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
109                 * FetchPlan fetch = oq.getFetchPlan();
110                 * fetch.setReadLockMode(LockModeType.WRITE);
111                 * fetch.setLockTimeout(-1); // 1 second }
112                 */
113                List<CoordinatorJobBean> cjBeans = q.getResultList();
114
115                if (cjBeans.size() > 0) {
116                    return cjBeans.get(0);
117                }
118                else {
119                    throw new StoreException(ErrorCode.E0604, id);
120                }
121            }
122        });
123
124        cjBean.setStatus(cjBean.getStatus());
125        return cjBean;
126    }
127
128    /**
129     * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
130     * argument will be returned.
131     *
132     * @param d Date
133     * @return List of Coordinator Jobs that have a last materialized time older than input date
134     * @throws StoreException
135     */
136    public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
137            throws StoreException {
138
139        ParamChecker.notNull(d, "Coord Job Materialization Date");
140        List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
141                                                                                  new Callable<List<CoordinatorJobBean>>() {
142                                                                                      public List<CoordinatorJobBean> call() throws StoreException {
143
144                                                                                          List<CoordinatorJobBean> cjBeans;
145                                                                                          List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
146                                                                                          try {
147                                                                                              Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
148                                                                                              q.setParameter("matTime", new Timestamp(d.getTime()));
149                                                                                              if (limit > 0) {
150                                                                                                  q.setMaxResults(limit);
151                                                                                              }
152                                                                                              /*
153                                                                                              OpenJPAQuery oq = OpenJPAPersistence.cast(q);
154                                                                                              FetchPlan fetch = oq.getFetchPlan();
155                                                                                              fetch.setReadLockMode(LockModeType.WRITE);
156                                                                                              fetch.setLockTimeout(-1); // no limit
157                                                                                              */
158                                                                                              cjBeans = q.getResultList();
159                                                                                              // copy results to a new object
160                                                                                              for (CoordinatorJobBean j : cjBeans) {
161                                                                                                  jobList.add(j);
162                                                                                              }
163                                                                                          }
164                                                                                          catch (IllegalStateException e) {
165                                                                                              throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
166                                                                                          }
167                                                                                          return jobList;
168
169                                                                                      }
170                                                                                  });
171        return cjBeans;
172    }
173
174    /**
175     * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
176     * checkAgeSecs will be returned.
177     *
178     * @param checkAgeSecs Job age in Seconds
179     * @param status Coordinator Job Status
180     * @param limit Number of results to return
181     * @param locking Flag for Table Lock
182     * @return List of Coordinator Jobs that are matched with the parameters.
183     * @throws StoreException
184     */
185    public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
186                                                                      final int limit, final boolean locking) throws StoreException {
187
188        ParamChecker.notNull(status, "Coord Job Status");
189        List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
190                                                                                  new Callable<List<CoordinatorJobBean>>() {
191                                                                                      public List<CoordinatorJobBean> call() throws StoreException {
192
193                                                                                          List<CoordinatorJobBean> cjBeans;
194                                                                                          List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
195                                                                                          try {
196                                                                                              Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
197                                                                                              Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
198                                                                                              q.setParameter("lastModTime", ts);
199                                                                                              q.setParameter("status", status);
200                                                                                              if (limit > 0) {
201                                                                                                  q.setMaxResults(limit);
202                                                                                              }
203                                                                                              /*
204                                                                                              * if (locking) { OpenJPAQuery oq =
205                                                                                              * OpenJPAPersistence.cast(q); FetchPlan fetch =
206                                                                                              * oq.getFetchPlan();
207                                                                                              * fetch.setReadLockMode(LockModeType.WRITE);
208                                                                                              * fetch.setLockTimeout(-1); // no limit }
209                                                                                              */
210                                                                                              cjBeans = q.getResultList();
211                                                                                              for (CoordinatorJobBean j : cjBeans) {
212                                                                                                  jobList.add(j);
213                                                                                              }
214                                                                                          }
215                                                                                          catch (Exception e) {
216                                                                                              throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
217                                                                                          }
218                                                                                          return jobList;
219
220                                                                                      }
221                                                                                  });
222        return cjBeans;
223    }
224
225    /**
226     * Load the CoordinatorAction into a Bean and return it.
227     *
228     * @param id action ID
229     * @return CoordinatorActionBean
230     * @throws StoreException
231     */
232    public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
233        ParamChecker.notEmpty(id, "actionID");
234        CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
235            public CoordinatorActionBean call() throws StoreException {
236                Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
237                q.setParameter("id", id);
238                OpenJPAQuery oq = OpenJPAPersistence.cast(q);
239                /*
240                 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
241                 * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
242                 * fetch.setReadLockMode(LockModeType.WRITE);
243                 * fetch.setLockTimeout(-1); // no limit }
244                 */
245
246                CoordinatorActionBean action = null;
247                List<CoordinatorActionBean> actions = q.getResultList();
248                if (actions.size() > 0) {
249                    action = actions.get(0);
250                }
251                else {
252                    throw new StoreException(ErrorCode.E0605, id);
253                }
254
255                /*
256                 * if (locking) return action; else
257                 */
258                return getBeanForRunningCoordAction(action);
259            }
260        });
261        return caBean;
262    }
263
264    /**
265     * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
266     * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY)
267     *
268     * @param id job ID
269     * @param numResults number of results to return
270     * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
271     * @return List of CoordinatorActionBean
272     * @throws StoreException
273     */
274    public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
275                                                                   final String executionOrder) throws StoreException {
276        ParamChecker.notEmpty(id, "jobID");
277        List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
278                                                          new Callable<List<CoordinatorActionBean>>() {
279                                                              public List<CoordinatorActionBean> call() throws StoreException {
280
281                                                                  List<CoordinatorActionBean> caBeans;
282                                                                  Query q;
283                                                                  // check if executionOrder is FIFO, LIFO, or LAST_ONLY
284                                                                  if (executionOrder.equalsIgnoreCase("FIFO")) {
285                                                                      q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
286                                                                  }
287                                                                  else {
288                                                                      q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
289                                                                  }
290                                                                  q.setParameter("jobId", id);
291                                                                  // if executionOrder is LAST_ONLY, only retrieve first
292                                                                  // record in LIFO,
293                                                                  // otherwise, use numResults if it is positive.
294                                                                  if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
295                                                                      q.setMaxResults(1);
296                                                                  }
297                                                                  else {
298                                                                      if (numResults > 0) {
299                                                                          q.setMaxResults(numResults);
300                                                                      }
301                                                                  }
302                                                                  caBeans = q.getResultList();
303                                                                  return caBeans;
304                                                              }
305                                                          });
306        return caBeans;
307    }
308
309    /**
310     * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
311     * concurrency number.
312     *
313     * @param id job ID
314     * @return Number of running actions
315     * @throws StoreException
316     */
317    public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
318        ParamChecker.notEmpty(id, "jobID");
319        Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
320            public Integer call() throws SQLException {
321
322                Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
323
324                q.setParameter("jobId", id);
325                Long count = (Long) q.getSingleResult();
326                return Integer.valueOf(count.intValue());
327            }
328        });
329        return cnt.intValue();
330    }
331
332    /**
333     * Create a new Action record in the ACTIONS table with the given Bean.
334     *
335     * @param action WorkflowActionBean
336     * @throws StoreException If the action is already present
337     */
338    public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
339        ParamChecker.notNull(action, "CoordinatorActionBean");
340        doOperation("insertCoordinatorAction", new Callable<Void>() {
341            public Void call() throws StoreException {
342                entityManager.persist(action);
343                return null;
344            }
345        });
346    }
347
348    /**
349     * Update the given action bean to DB.
350     *
351     * @param action Action Bean
352     * @throws StoreException if action doesn't exist
353     */
354    public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
355        ParamChecker.notNull(action, "CoordinatorActionBean");
356        doOperation("updateCoordinatorAction", new Callable<Void>() {
357            public Void call() throws StoreException {
358                Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
359                q.setParameter("id", action.getId());
360                setActionQueryParameters(action, q);
361                q.executeUpdate();
362                return null;
363            }
364        });
365    }
366
367    /**
368     * Update the given action bean to DB.
369     *
370     * @param action Action Bean
371     * @throws StoreException if action doesn't exist
372     */
373    public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
374        ParamChecker.notNull(action, "CoordinatorActionBean");
375        doOperation("updateCoordinatorAction", new Callable<Void>() {
376            public Void call() throws StoreException {
377                Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
378                q.setParameter("id", action.getId());
379                q.setParameter("missingDependencies", action.getMissingDependencies());
380                q.setParameter("lastModifiedTime", new Date());
381                q.setParameter("status", action.getStatus().toString());
382                q.setParameter("actionXml", action.getActionXml());
383                q.executeUpdate();
384                return null;
385            }
386        });
387    }
388
389    /**
390     * Update the given coordinator job bean to DB.
391     *
392     * @param jobbean Coordinator Job Bean
393     * @throws StoreException if action doesn't exist
394     */
395    public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
396        ParamChecker.notNull(job, "CoordinatorJobBean");
397        doOperation("updateJob", new Callable<Void>() {
398            public Void call() throws StoreException {
399                Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
400                q.setParameter("id", job.getId());
401                setJobQueryParameters(job, q);
402                q.executeUpdate();
403                return null;
404            }
405        });
406    }
407
408    public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
409        ParamChecker.notNull(job, "CoordinatorJobBean");
410        doOperation("updateJobStatus", new Callable<Void>() {
411            public Void call() throws StoreException {
412                Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
413                q.setParameter("id", job.getId());
414                q.setParameter("status", job.getStatus().toString());
415                q.setParameter("lastModifiedTime", new Date());
416                q.executeUpdate();
417                return null;
418            }
419        });
420    }
421
422    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
423        try {
424            Instrumentation.Cron cron = new Instrumentation.Cron();
425            cron.start();
426            V retVal;
427            try {
428                retVal = command.call();
429            }
430            finally {
431                cron.stop();
432            }
433            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
434            return retVal;
435        }
436        catch (StoreException ex) {
437            throw ex;
438        }
439        catch (SQLException ex) {
440            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
441        }
442        catch (Exception e) {
443            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
444        }
445    }
446
447    private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) {
448        q.setParameter("appName", jBean.getAppName());
449        q.setParameter("appPath", jBean.getAppPath());
450        q.setParameter("concurrency", jBean.getConcurrency());
451        q.setParameter("conf", jBean.getConf());
452        q.setParameter("externalId", jBean.getExternalId());
453        q.setParameter("frequency", jBean.getFrequency());
454        q.setParameter("lastActionNumber", jBean.getLastActionNumber());
455        q.setParameter("timeOut", jBean.getTimeout());
456        q.setParameter("timeZone", jBean.getTimeZone());
457        q.setParameter("createdTime", jBean.getCreatedTimestamp());
458        q.setParameter("endTime", jBean.getEndTimestamp());
459        q.setParameter("execution", jBean.getExecution());
460        q.setParameter("jobXml", jBean.getJobXml());
461        q.setParameter("lastAction", jBean.getLastActionTimestamp());
462        q.setParameter("lastModifiedTime", new Date());
463        q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp());
464        q.setParameter("origJobXml", jBean.getOrigJobXml());
465        q.setParameter("slaXml", jBean.getSlaXml());
466        q.setParameter("startTime", jBean.getStartTimestamp());
467        q.setParameter("status", jBean.getStatus().toString());
468        q.setParameter("timeUnit", jBean.getTimeUnitStr());
469    }
470
471    private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) {
472        q.setParameter("actionNumber", aBean.getActionNumber());
473        q.setParameter("actionXml", aBean.getActionXml());
474        q.setParameter("consoleUrl", aBean.getConsoleUrl());
475        q.setParameter("createdConf", aBean.getCreatedConf());
476        q.setParameter("errorCode", aBean.getErrorCode());
477        q.setParameter("errorMessage", aBean.getErrorMessage());
478        q.setParameter("externalStatus", aBean.getExternalStatus());
479        q.setParameter("missingDependencies", aBean.getMissingDependencies());
480        q.setParameter("runConf", aBean.getRunConf());
481        q.setParameter("timeOut", aBean.getTimeOut());
482        q.setParameter("trackerUri", aBean.getTrackerUri());
483        q.setParameter("type", aBean.getType());
484        q.setParameter("createdTime", aBean.getCreatedTimestamp());
485        q.setParameter("externalId", aBean.getExternalId());
486        q.setParameter("jobId", aBean.getJobId());
487        q.setParameter("lastModifiedTime", new Date());
488        q.setParameter("nominalTime", aBean.getNominalTimestamp());
489        q.setParameter("slaXml", aBean.getSlaXml());
490        q.setParameter("status", aBean.getStatus().toString());
491    }
492
493
494    /**
495     * Purge the coordinators completed older than given days.
496     *
497     * @param olderThanDays number of days for which to preserve the coordinators
498     * @param limit maximum number of coordinator jobs to be purged
499     * @throws StoreException
500     */
501    public void purge(final long olderThanDays, final int limit) throws StoreException {
502        doOperation("coord-purge", new Callable<Void>() {
503            public Void call() throws SQLException, StoreException, WorkflowException {
504                Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
505                Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
506                jobQ.setParameter("lastModTime", lastModTm);
507                jobQ.setMaxResults(limit);
508                List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
509
510                int actionDeleted = 0;
511                if (coordJobs.size() != 0) {
512                    for (CoordinatorJobBean coord : coordJobs) {
513                        String jobId = coord.getId();
514                        entityManager.remove(coord);
515                        Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
516                        g.setParameter("jobId", jobId);
517                        actionDeleted += g.executeUpdate();
518                    }
519                }
520
521                XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
522                return null;
523            }
524        });
525    }
526
527    public void commit() throws StoreException {
528    }
529
530    public void close() throws StoreException {
531    }
532
533    public CoordinatorJobBean getCoordinatorJobs(String id) {
534        // TODO Auto-generated method stub
535        return null;
536    }
537
538    public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
539            throws StoreException {
540
541        CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
542            public CoordinatorJobInfo call() throws SQLException, StoreException {
543                List<String> orArray = new ArrayList<String>();
544                List<String> colArray = new ArrayList<String>();
545                List<String> valArray = new ArrayList<String>();
546                StringBuilder sb = new StringBuilder("");
547
548                StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
549                                         StoreStatusFilter.coordCountStr);
550
551                int realLen = 0;
552
553                Query q = null;
554                Query qTotal = null;
555                if (orArray.size() == 0) {
556                    q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
557                    q.setFirstResult(start - 1);
558                    q.setMaxResults(len);
559                    qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
560                }
561                else {
562                    StringBuilder sbTotal = new StringBuilder(sb);
563                    sb.append(" order by w.createdTimestamp desc ");
564                    XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
565                    q = entityManager.createQuery(sb.toString());
566                    q.setFirstResult(start - 1);
567                    q.setMaxResults(len);
568                    qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
569                                                                                  StoreStatusFilter.coordCountStr));
570                }
571
572                for (int i = 0; i < orArray.size(); i++) {
573                    q.setParameter(colArray.get(i), valArray.get(i));
574                    qTotal.setParameter(colArray.get(i), valArray.get(i));
575                }
576
577                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
578                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
579                fetch.setFetchBatchSize(20);
580                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
581                fetch.setFetchDirection(FetchDirection.FORWARD);
582                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
583                List<?> resultList = q.getResultList();
584                List<Object[]> objectArrList = (List<Object[]>) resultList;
585                List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
586
587                for (Object[] arr : objectArrList) {
588                    CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
589                    coordBeansList.add(ww);
590                }
591
592                realLen = ((Long) qTotal.getSingleResult()).intValue();
593
594                return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
595            }
596        });
597        return coordJobInfo;
598    }
599
600    private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
601        CoordinatorJobBean bean = new CoordinatorJobBean();
602        bean.setId((String) arr[0]);
603        if (arr[1] != null) {
604            bean.setAppName((String) arr[1]);
605        }
606        if (arr[2] != null) {
607            bean.setStatus(Status.valueOf((String) arr[2]));
608        }
609        if (arr[3] != null) {
610            bean.setUser((String) arr[3]);
611        }
612        if (arr[4] != null) {
613            bean.setGroup((String) arr[4]);
614        }
615        if (arr[5] != null) {
616            bean.setStartTime((Timestamp) arr[5]);
617        }
618        if (arr[6] != null) {
619            bean.setEndTime((Timestamp) arr[6]);
620        }
621        if (arr[7] != null) {
622            bean.setAppPath((String) arr[7]);
623        }
624        if (arr[8] != null) {
625            bean.setConcurrency(((Integer) arr[8]).intValue());
626        }
627        if (arr[9] != null) {
628            bean.setFrequency((String) arr[9]);
629        }
630        if (arr[10] != null) {
631            bean.setLastActionTime((Timestamp) arr[10]);
632        }
633        if (arr[11] != null) {
634            bean.setNextMaterializedTime((Timestamp) arr[11]);
635        }
636        if (arr[13] != null) {
637            bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
638        }
639        if (arr[14] != null) {
640            bean.setTimeZone((String) arr[14]);
641        }
642        if (arr[15] != null) {
643            bean.setTimeout((Integer) arr[15]);
644        }
645        return bean;
646    }
647
648    /**
649     * Loads all actions for the given Coordinator job.
650     *
651     * @param jobId coordinator job id
652     * @param locking true if Actions are to be locked
653     * @return A List of CoordinatorActionBean
654     * @throws StoreException
655     */
656    public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking)
657            throws StoreException {
658        ParamChecker.notEmpty(jobId, "CoordinatorJobID");
659        Integer actionsCount = doOperation("getActionsForCoordinatorJob",
660                                                          new Callable<Integer>() {
661                                                              @SuppressWarnings("unchecked")
662                                                              public Integer call() throws StoreException {
663                                                                  List<CoordinatorActionBean> actions;
664                                                                  List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
665                                                                  try {
666                                                                      Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
667                                                                      q.setParameter("jobId", jobId);
668                                                                      /*
669                                                                      * if (locking) { //
670                                                                      * q.setHint("openjpa.FetchPlan.ReadLockMode", //
671                                                                      * "READ"); OpenJPAQuery oq =
672                                                                      * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
673                                                                      * (JDBCFetchPlan) oq.getFetchPlan();
674                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
675                                                                      * fetch.setLockTimeout(-1); // 1 second }
676                                                                      */
677                                                                      Long count = (Long) q.getSingleResult();
678                                                                      return Integer.valueOf(count.intValue());
679                                                                      /*actions = q.getResultList();
680                                                                      for (CoordinatorActionBean a : actions) {
681                                                                          CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
682                                                                          actionList.add(aa);
683                                                                      }*/
684                                                                  }
685                                                                  catch (IllegalStateException e) {
686                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
687                                                                  }
688                                                                  /*
689                                                                  * if (locking) { return actions; } else {
690                                                                  */
691
692                                                                  // }
693                                                              }
694                                                          });
695        return actionsCount;
696    }
697
698    /**
699     * Loads given number of actions for the given Coordinator job.
700     *
701     * @param jobId coordinator job id
702     * @param start offset for select statement
703     * @param len number of Workflow Actions to be returned
704     * @return A List of CoordinatorActionBean
705     * @throws StoreException
706     */
707    public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
708                                                                         final int len) throws StoreException {
709        ParamChecker.notEmpty(jobId, "CoordinatorJobID");
710        List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
711                                                          new Callable<List<CoordinatorActionBean>>() {
712                                                              @SuppressWarnings("unchecked")
713                                                              public List<CoordinatorActionBean> call() throws StoreException {
714                                                                  List<CoordinatorActionBean> actions;
715                                                                  List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
716                                                                  try {
717                                                                      Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
718                                                                      q.setParameter("jobId", jobId);
719                                                                      q.setFirstResult(start - 1);
720                                                                      q.setMaxResults(len);
721                                                                      actions = q.getResultList();
722                                                                      for (CoordinatorActionBean a : actions) {
723                                                                          CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
724                                                                          actionList.add(aa);
725                                                                      }
726                                                                  }
727                                                                  catch (IllegalStateException e) {
728                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
729                                                                  }
730                                                                  return actionList;
731                                                              }
732                                                          });
733        return actions;
734    }
735
736    protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
737        if (a != null) {
738            CoordinatorActionBean action = new CoordinatorActionBean();
739            action.setId(a.getId());
740            action.setActionNumber(a.getActionNumber());
741            action.setActionXml(a.getActionXml());
742            action.setConsoleUrl(a.getConsoleUrl());
743            action.setCreatedConf(a.getCreatedConf());
744            //action.setErrorCode(a.getErrorCode());
745            //action.setErrorMessage(a.getErrorMessage());
746            action.setExternalStatus(a.getExternalStatus());
747            action.setMissingDependencies(a.getMissingDependencies());
748            action.setRunConf(a.getRunConf());
749            action.setTimeOut(a.getTimeOut());
750            action.setTrackerUri(a.getTrackerUri());
751            action.setType(a.getType());
752            action.setCreatedTime(a.getCreatedTime());
753            action.setExternalId(a.getExternalId());
754            action.setJobId(a.getJobId());
755            action.setLastModifiedTime(a.getLastModifiedTime());
756            action.setNominalTime(a.getNominalTime());
757            action.setSlaXml(a.getSlaXml());
758            action.setStatus(a.getStatus());
759            return action;
760        }
761        return null;
762    }
763
764    public CoordinatorActionBean getAction(String id, boolean b) {
765        return null;
766    }
767
768
769    public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
770            throws StoreException {
771        ParamChecker.notEmpty(jobId, "CoordinatorJobID");
772        List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
773                                                          new Callable<List<CoordinatorActionBean>>() {
774                                                              @SuppressWarnings("unchecked")
775                                                              public List<CoordinatorActionBean> call() throws StoreException {
776                                                                  List<CoordinatorActionBean> actions;
777                                                                  try {
778                                                                      Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
779                                                                      q.setParameter("jobId", jobId);
780                                                                      /*
781                                                                      * if (locking) {
782                                                                      * q.setHint("openjpa.FetchPlan.ReadLockMode",
783                                                                      * "READ"); OpenJPAQuery oq =
784                                                                      * OpenJPAPersistence.cast(q); FetchPlan fetch =
785                                                                      * oq.getFetchPlan();
786                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
787                                                                      * fetch.setLockTimeout(-1); // no limit }
788                                                                      */
789                                                                      actions = q.getResultList();
790                                                                      return actions;
791                                                                  }
792                                                                  catch (IllegalStateException e) {
793                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
794                                                                  }
795                                                              }
796                                                          });
797        return actions;
798    }
799
800    public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
801            throws StoreException {
802        List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
803                                                          new Callable<List<CoordinatorActionBean>>() {
804                                                              @SuppressWarnings("unchecked")
805                                                              public List<CoordinatorActionBean> call() throws StoreException {
806                                                                  List<CoordinatorActionBean> actions;
807                                                                  Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
808                                                                  try {
809                                                                      Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
810                                                                      q.setParameter("lastModifiedTime", ts);
811                                                                      /*
812                                                                      * if (locking) { OpenJPAQuery oq =
813                                                                      * OpenJPAPersistence.cast(q); FetchPlan fetch =
814                                                                      * oq.getFetchPlan();
815                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
816                                                                      * fetch.setLockTimeout(-1); // no limit }
817                                                                      */
818                                                                      actions = q.getResultList();
819                                                                      return actions;
820                                                                  }
821                                                                  catch (IllegalStateException e) {
822                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
823                                                                  }
824                                                              }
825                                                          });
826        return actions;
827    }
828
829    public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
830            throws StoreException {
831        List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
832                                                          new Callable<List<CoordinatorActionBean>>() {
833                                                              @SuppressWarnings("unchecked")
834                                                              public List<CoordinatorActionBean> call() throws StoreException {
835                                                                  List<CoordinatorActionBean> actions;
836                                                                  try {
837                                                                      Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
838                                                                      Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
839                                                                      q.setParameter("lastModifiedTime", ts);
840                                                                      /*
841                                                                      * if (locking) { OpenJPAQuery oq =
842                                                                      * OpenJPAPersistence.cast(q); FetchPlan fetch =
843                                                                      * oq.getFetchPlan();
844                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
845                                                                      * fetch.setLockTimeout(-1); // no limit }
846                                                                      */
847                                                                      actions = q.getResultList();
848                                                                      return actions;
849                                                                  }
850                                                                  catch (IllegalStateException e) {
851                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
852                                                                  }
853                                                              }
854                                                          });
855        return actions;
856    }
857
858    /**
859     * Get coordinator action beans for given start date and end date
860     *
861     * @param startDate
862     * @param endDate
863     * @return list of coordinator action beans
864     * @throws StoreException
865     */
866    public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
867            final Date endDate)
868            throws StoreException {
869        List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
870                new Callable<List<CoordinatorActionBean>>() {
871                    @SuppressWarnings("unchecked")
872                    public List<CoordinatorActionBean> call() throws StoreException {
873                        List<CoordinatorActionBean> actions;
874                        try {
875                            Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
876                            q.setParameter("jobId", jobId);
877                            q.setParameter("startTime", new Timestamp(startDate.getTime()));
878                            q.setParameter("endTime", new Timestamp(endDate.getTime()));
879                            actions = q.getResultList();
880
881                            List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
882                            for (CoordinatorActionBean a : actions) {
883                                CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
884                                actionList.add(aa);
885                            }
886                            return actionList;
887                        }
888                        catch (IllegalStateException e) {
889                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
890                        }
891                    }
892                });
893        return actions;
894    }
895
896    /**
897     * Get coordinator action bean for given date
898     *
899     * @param nominalTime
900     * @return CoordinatorActionBean
901     * @throws StoreException
902     */
903    public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
904            throws StoreException {
905        CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
906                new Callable<CoordinatorActionBean>() {
907            @SuppressWarnings("unchecked")
908            public CoordinatorActionBean call() throws StoreException {
909                List<CoordinatorActionBean> actions;
910                Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
911                q.setParameter("jobId", jobId);
912                q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
913                actions = q.getResultList();
914
915                CoordinatorActionBean action = null;
916                if (actions.size() > 0) {
917                    action = actions.get(0);
918                }
919                else {
920                    throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime));
921                }
922                return getBeanForRunningCoordAction(action);
923            }
924        });
925        return action;
926    }
927
928    public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
929        List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
930            @SuppressWarnings("unchecked")
931            public List<String> call() throws StoreException {
932                List<String> jobids = new ArrayList<String>();
933                try {
934                    Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
935                    Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
936                    q.setParameter(1, ts);
937                    List<Object[]> list = q.getResultList();
938
939                    for (Object[] arr : list) {
940                        if (arr != null && arr[0] != null) {
941                            jobids.add((String) arr[0]);
942                        }
943                    }
944
945                    return jobids;
946                }
947                catch (IllegalStateException e) {
948                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
949                }
950            }
951        });
952        return jobids;
953    }
954}