001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.hadoop.yarn.api.records.ReservationId;
042
043/**
044 * The job submitter's view of the Job.
045 * 
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an 
049 * IllegalStateException. </p>
050 * 
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 * 
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 *     // Create a new Job
058 *     Job job = Job.getInstance();
059 *     job.setJarByClass(MyJob.class);
060 *     
061 *     // Specify various job-specific parameters     
062 *     job.setJobName("myjob");
063 *     
064 *     job.setInputPath(new Path("in"));
065 *     job.setOutputPath(new Path("out"));
066 *     
067 *     job.setMapperClass(MyJob.MyMapper.class);
068 *     job.setReducerClass(MyJob.MyReducer.class);
069 *
070 *     // Submit the job, then poll for progress until the job is complete
071 *     job.waitForCompletion(true);
072 * </pre></blockquote></p>
073 * 
074 * 
075 */
076@InterfaceAudience.Public
077@InterfaceStability.Evolving
078public class Job extends JobContextImpl implements JobContext {  
079  private static final Log LOG = LogFactory.getLog(Job.class);
080
081  @InterfaceStability.Evolving
082  public static enum JobState {DEFINE, RUNNING};
083  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086  public static final String COMPLETION_POLL_INTERVAL_KEY = 
087    "mapreduce.client.completion.pollinterval";
088  
089  /** Default completionPollIntervalMillis is 5000 ms. */
090  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093    "mapreduce.client.progressmonitor.pollinterval";
094  /** Default progMonitorPollIntervalMillis is 1000 ms. */
095  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097  public static final String USED_GENERIC_PARSER = 
098    "mapreduce.client.genericoptionsparser.used";
099  public static final String SUBMIT_REPLICATION = 
100    "mapreduce.client.submit.file.replication";
101  private static final String TASKLOG_PULL_TIMEOUT_KEY =
102           "mapreduce.client.tasklog.timeout";
103  private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
104
105  @InterfaceStability.Evolving
106  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
107
108  static {
109    ConfigUtil.loadResources();
110  }
111
112  private JobState state = JobState.DEFINE;
113  private JobStatus status;
114  private long statustime;
115  private Cluster cluster;
116  private ReservationId reservationId;
117
118  /**
119   * @deprecated Use {@link #getInstance()}
120   */
121  @Deprecated
122  public Job() throws IOException {
123    this(new Configuration());
124  }
125
126  /**
127   * @deprecated Use {@link #getInstance(Configuration)}
128   */
129  @Deprecated
130  public Job(Configuration conf) throws IOException {
131    this(new JobConf(conf));
132  }
133
134  /**
135   * @deprecated Use {@link #getInstance(Configuration, String)}
136   */
137  @Deprecated
138  public Job(Configuration conf, String jobName) throws IOException {
139    this(conf);
140    setJobName(jobName);
141  }
142
143  Job(JobConf conf) throws IOException {
144    super(conf, null);
145    // propagate existing user credentials to job
146    this.credentials.mergeAll(this.ugi.getCredentials());
147    this.cluster = null;
148  }
149
150  Job(JobStatus status, JobConf conf) throws IOException {
151    this(conf);
152    setJobID(status.getJobID());
153    this.status = status;
154    state = JobState.RUNNING;
155  }
156
157      
158  /**
159   * Creates a new {@link Job} with no particular {@link Cluster} .
160   * A Cluster will be created with a generic {@link Configuration}.
161   * 
162   * @return the {@link Job} , with no connection to a cluster yet.
163   * @throws IOException
164   */
165  public static Job getInstance() throws IOException {
166    // create with a null Cluster
167    return getInstance(new Configuration());
168  }
169      
170  /**
171   * Creates a new {@link Job} with no particular {@link Cluster} and a 
172   * given {@link Configuration}.
173   * 
174   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
175   * that any necessary internal modifications do not reflect on the incoming 
176   * parameter.
177   * 
178   * A Cluster will be created from the conf parameter only when it's needed.
179   * 
180   * @param conf the configuration
181   * @return the {@link Job} , with no connection to a cluster yet.
182   * @throws IOException
183   */
184  public static Job getInstance(Configuration conf) throws IOException {
185    // create with a null Cluster
186    JobConf jobConf = new JobConf(conf);
187    return new Job(jobConf);
188  }
189
190      
191  /**
192   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
193   * A Cluster will be created from the conf parameter only when it's needed.
194   *
195   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
196   * that any necessary internal modifications do not reflect on the incoming 
197   * parameter.
198   * 
199   * @param conf the configuration
200   * @return the {@link Job} , with no connection to a cluster yet.
201   * @throws IOException
202   */
203  public static Job getInstance(Configuration conf, String jobName)
204           throws IOException {
205    // create with a null Cluster
206    Job result = getInstance(conf);
207    result.setJobName(jobName);
208    return result;
209  }
210  
211  /**
212   * Creates a new {@link Job} with no particular {@link Cluster} and given
213   * {@link Configuration} and {@link JobStatus}.
214   * A Cluster will be created from the conf parameter only when it's needed.
215   * 
216   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
217   * that any necessary internal modifications do not reflect on the incoming 
218   * parameter.
219   * 
220   * @param status job status
221   * @param conf job configuration
222   * @return the {@link Job} , with no connection to a cluster yet.
223   * @throws IOException
224   */
225  public static Job getInstance(JobStatus status, Configuration conf) 
226  throws IOException {
227    return new Job(status, new JobConf(conf));
228  }
229
230  /**
231   * Creates a new {@link Job} with no particular {@link Cluster}.
232   * A Cluster will be created from the conf parameter only when it's needed.
233   *
234   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
235   * that any necessary internal modifications do not reflect on the incoming 
236   * parameter.
237   * 
238   * @param ignored
239   * @return the {@link Job} , with no connection to a cluster yet.
240   * @throws IOException
241   * @deprecated Use {@link #getInstance()}
242   */
243  @Deprecated
244  public static Job getInstance(Cluster ignored) throws IOException {
245    return getInstance();
246  }
247  
248  /**
249   * Creates a new {@link Job} with no particular {@link Cluster} and given
250   * {@link Configuration}.
251   * A Cluster will be created from the conf parameter only when it's needed.
252   * 
253   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
254   * that any necessary internal modifications do not reflect on the incoming 
255   * parameter.
256   * 
257   * @param ignored
258   * @param conf job configuration
259   * @return the {@link Job} , with no connection to a cluster yet.
260   * @throws IOException
261   * @deprecated Use {@link #getInstance(Configuration)}
262   */
263  @Deprecated
264  public static Job getInstance(Cluster ignored, Configuration conf) 
265      throws IOException {
266    return getInstance(conf);
267  }
268  
269  /**
270   * Creates a new {@link Job} with no particular {@link Cluster} and given
271   * {@link Configuration} and {@link JobStatus}.
272   * A Cluster will be created from the conf parameter only when it's needed.
273   * 
274   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
275   * that any necessary internal modifications do not reflect on the incoming 
276   * parameter.
277   * 
278   * @param cluster cluster
279   * @param status job status
280   * @param conf job configuration
281   * @return the {@link Job} , with no connection to a cluster yet.
282   * @throws IOException
283   */
284  @Private
285  public static Job getInstance(Cluster cluster, JobStatus status, 
286      Configuration conf) throws IOException {
287    Job job = getInstance(status, conf);
288    job.setCluster(cluster);
289    return job;
290  }
291
292  private void ensureState(JobState state) throws IllegalStateException {
293    if (state != this.state) {
294      throw new IllegalStateException("Job in state "+ this.state + 
295                                      " instead of " + state);
296    }
297
298    if (state == JobState.RUNNING && cluster == null) {
299      throw new IllegalStateException
300        ("Job in state " + this.state
301         + ", but it isn't attached to any job tracker!");
302    }
303  }
304
305  /**
306   * Some methods rely on having a recent job status object.  Refresh
307   * it, if necessary
308   */
309  synchronized void ensureFreshStatus() 
310      throws IOException {
311    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
312      updateStatus();
313    }
314  }
315    
316  /** Some methods need to update status immediately. So, refresh
317   * immediately
318   * @throws IOException
319   */
320  synchronized void updateStatus() throws IOException {
321    try {
322      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
323        @Override
324        public JobStatus run() throws IOException, InterruptedException {
325          return cluster.getClient().getJobStatus(status.getJobID());
326        }
327      });
328    }
329    catch (InterruptedException ie) {
330      throw new IOException(ie);
331    }
332    if (this.status == null) {
333      throw new IOException("Job status not available ");
334    }
335    this.statustime = System.currentTimeMillis();
336  }
337  
338  public JobStatus getStatus() throws IOException, InterruptedException {
339    ensureState(JobState.RUNNING);
340    updateStatus();
341    return status;
342  }
343  
344  private void setStatus(JobStatus status) {
345    this.status = status;
346  }
347
348  /**
349   * Returns the current state of the Job.
350   * 
351   * @return JobStatus#State
352   * @throws IOException
353   * @throws InterruptedException
354   */
355  public JobStatus.State getJobState() 
356      throws IOException, InterruptedException {
357    ensureState(JobState.RUNNING);
358    updateStatus();
359    return status.getState();
360  }
361  
362  /**
363   * Get the URL where some job progress information will be displayed.
364   * 
365   * @return the URL where some job progress information will be displayed.
366   */
367  public String getTrackingURL(){
368    ensureState(JobState.RUNNING);
369    return status.getTrackingUrl().toString();
370  }
371
372  /**
373   * Get the path of the submitted job configuration.
374   * 
375   * @return the path of the submitted job configuration.
376   */
377  public String getJobFile() {
378    ensureState(JobState.RUNNING);
379    return status.getJobFile();
380  }
381
382  /**
383   * Get start time of the job.
384   * 
385   * @return the start time of the job
386   */
387  public long getStartTime() {
388    ensureState(JobState.RUNNING);
389    return status.getStartTime();
390  }
391
392  /**
393   * Get finish time of the job.
394   * 
395   * @return the finish time of the job
396   */
397  public long getFinishTime() throws IOException, InterruptedException {
398    ensureState(JobState.RUNNING);
399    updateStatus();
400    return status.getFinishTime();
401  }
402
403  /**
404   * Get scheduling info of the job.
405   * 
406   * @return the scheduling info of the job
407   */
408  public String getSchedulingInfo() {
409    ensureState(JobState.RUNNING);
410    return status.getSchedulingInfo();
411  }
412
413  /**
414   * Get scheduling info of the job.
415   * 
416   * @return the scheduling info of the job
417   */
418  public JobPriority getPriority() throws IOException, InterruptedException {
419    ensureState(JobState.RUNNING);
420    updateStatus();
421    return status.getPriority();
422  }
423
424  /**
425   * The user-specified job name.
426   */
427  public String getJobName() {
428    if (state == JobState.DEFINE) {
429      return super.getJobName();
430    }
431    ensureState(JobState.RUNNING);
432    return status.getJobName();
433  }
434
435  public String getHistoryUrl() throws IOException, InterruptedException {
436    ensureState(JobState.RUNNING);
437    updateStatus();
438    return status.getHistoryFile();
439  }
440
441  public boolean isRetired() throws IOException, InterruptedException {
442    ensureState(JobState.RUNNING);
443    updateStatus();
444    return status.isRetired();
445  }
446  
447  @Private
448  public Cluster getCluster() {
449    return cluster;
450  }
451
452  /** Only for mocks in unit tests. */
453  @Private
454  private void setCluster(Cluster cluster) {
455    this.cluster = cluster;
456  }
457
458  /**
459   * Dump stats to screen.
460   */
461  @Override
462  public String toString() {
463    ensureState(JobState.RUNNING);
464    String reasonforFailure = " ";
465    int numMaps = 0;
466    int numReduces = 0;
467    try {
468      updateStatus();
469      if (status.getState().equals(JobStatus.State.FAILED))
470        reasonforFailure = getTaskFailureEventString();
471      numMaps = getTaskReports(TaskType.MAP).length;
472      numReduces = getTaskReports(TaskType.REDUCE).length;
473    } catch (IOException e) {
474    } catch (InterruptedException ie) {
475    }
476    StringBuffer sb = new StringBuffer();
477    sb.append("Job: ").append(status.getJobID()).append("\n");
478    sb.append("Job File: ").append(status.getJobFile()).append("\n");
479    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
480    sb.append("\n");
481    sb.append("Uber job : ").append(status.isUber()).append("\n");
482    sb.append("Number of maps: ").append(numMaps).append("\n");
483    sb.append("Number of reduces: ").append(numReduces).append("\n");
484    sb.append("map() completion: ");
485    sb.append(status.getMapProgress()).append("\n");
486    sb.append("reduce() completion: ");
487    sb.append(status.getReduceProgress()).append("\n");
488    sb.append("Job state: ");
489    sb.append(status.getState()).append("\n");
490    sb.append("retired: ").append(status.isRetired()).append("\n");
491    sb.append("reason for failure: ").append(reasonforFailure);
492    return sb.toString();
493  }
494
495  /**
496   * @return taskid which caused job failure
497   * @throws IOException
498   * @throws InterruptedException
499   */
500  String getTaskFailureEventString() throws IOException,
501      InterruptedException {
502    int failCount = 1;
503    TaskCompletionEvent lastEvent = null;
504    TaskCompletionEvent[] events = ugi.doAs(new 
505        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
506          @Override
507          public TaskCompletionEvent[] run() throws IOException,
508          InterruptedException {
509            return cluster.getClient().getTaskCompletionEvents(
510                status.getJobID(), 0, 10);
511          }
512        });
513    for (TaskCompletionEvent event : events) {
514      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
515        failCount++;
516        lastEvent = event;
517      }
518    }
519    if (lastEvent == null) {
520      return "There are no failed tasks for the job. "
521          + "Job is failed due to some other reason and reason "
522          + "can be found in the logs.";
523    }
524    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
525    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
526    return (" task " + taskID + " failed " +
527      failCount + " times " + "For details check tasktracker at: " +
528      lastEvent.getTaskTrackerHttp());
529  }
530
531  /**
532   * Get the information of the current state of the tasks of a job.
533   * 
534   * @param type Type of the task
535   * @return the list of all of the map tips.
536   * @throws IOException
537   */
538  public TaskReport[] getTaskReports(TaskType type) 
539      throws IOException, InterruptedException {
540    ensureState(JobState.RUNNING);
541    final TaskType tmpType = type;
542    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
543      public TaskReport[] run() throws IOException, InterruptedException {
544        return cluster.getClient().getTaskReports(getJobID(), tmpType);
545      }
546    });
547  }
548
549  /**
550   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
551   * and 1.0.  When all map tasks have completed, the function returns 1.0.
552   * 
553   * @return the progress of the job's map-tasks.
554   * @throws IOException
555   */
556  public float mapProgress() throws IOException {
557    ensureState(JobState.RUNNING);
558    ensureFreshStatus();
559    return status.getMapProgress();
560  }
561
562  /**
563   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
564   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
565   * 
566   * @return the progress of the job's reduce-tasks.
567   * @throws IOException
568   */
569  public float reduceProgress() throws IOException {
570    ensureState(JobState.RUNNING);
571    ensureFreshStatus();
572    return status.getReduceProgress();
573  }
574
575  /**
576   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
577   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
578   * 
579   * @return the progress of the job's cleanup-tasks.
580   * @throws IOException
581   */
582  public float cleanupProgress() throws IOException, InterruptedException {
583    ensureState(JobState.RUNNING);
584    ensureFreshStatus();
585    return status.getCleanupProgress();
586  }
587
588  /**
589   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
590   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
591   * 
592   * @return the progress of the job's setup-tasks.
593   * @throws IOException
594   */
595  public float setupProgress() throws IOException {
596    ensureState(JobState.RUNNING);
597    ensureFreshStatus();
598    return status.getSetupProgress();
599  }
600
601  /**
602   * Check if the job is finished or not. 
603   * This is a non-blocking call.
604   * 
605   * @return <code>true</code> if the job is complete, else <code>false</code>.
606   * @throws IOException
607   */
608  public boolean isComplete() throws IOException {
609    ensureState(JobState.RUNNING);
610    updateStatus();
611    return status.isJobComplete();
612  }
613
614  /**
615   * Check if the job completed successfully. 
616   * 
617   * @return <code>true</code> if the job succeeded, else <code>false</code>.
618   * @throws IOException
619   */
620  public boolean isSuccessful() throws IOException {
621    ensureState(JobState.RUNNING);
622    updateStatus();
623    return status.getState() == JobStatus.State.SUCCEEDED;
624  }
625
626  /**
627   * Kill the running job.  Blocks until all job tasks have been
628   * killed as well.  If the job is no longer running, it simply returns.
629   * 
630   * @throws IOException
631   */
632  public void killJob() throws IOException {
633    ensureState(JobState.RUNNING);
634    try {
635      cluster.getClient().killJob(getJobID());
636    }
637    catch (InterruptedException ie) {
638      throw new IOException(ie);
639    }
640  }
641
642  /**
643   * Set the priority of a running job.
644   * @param priority the new priority for the job.
645   * @throws IOException
646   */
647  public void setPriority(JobPriority priority) 
648      throws IOException, InterruptedException {
649    if (state == JobState.DEFINE) {
650      conf.setJobPriority(
651        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
652    } else {
653      ensureState(JobState.RUNNING);
654      final JobPriority tmpPriority = priority;
655      ugi.doAs(new PrivilegedExceptionAction<Object>() {
656        @Override
657        public Object run() throws IOException, InterruptedException {
658          cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
659          return null;
660        }
661      });
662    }
663  }
664
665  /**
666   * Get events indicating completion (success/failure) of component tasks.
667   *  
668   * @param startFrom index to start fetching events from
669   * @param numEvents number of events to fetch
670   * @return an array of {@link TaskCompletionEvent}s
671   * @throws IOException
672   */
673  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
674      final int numEvents) throws IOException, InterruptedException {
675    ensureState(JobState.RUNNING);
676    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
677      @Override
678      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
679        return cluster.getClient().getTaskCompletionEvents(getJobID(),
680            startFrom, numEvents); 
681      }
682    });
683  }
684
685  /**
686   * Get events indicating completion (success/failure) of component tasks.
687   *  
688   * @param startFrom index to start fetching events from
689   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
690   * @throws IOException
691   */
692  public org.apache.hadoop.mapred.TaskCompletionEvent[]
693    getTaskCompletionEvents(final int startFrom) throws IOException {
694    try {
695      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
696      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
697          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
698      for (int i = 0; i < events.length; i++) {
699        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
700            (events[i]);
701      }
702      return retEvents;
703    } catch (InterruptedException ie) {
704      throw new IOException(ie);
705    }
706  }
707
708  /**
709   * Kill indicated task attempt.
710   * @param taskId the id of the task to kill.
711   * @param shouldFail if <code>true</code> the task is failed and added
712   *                   to failed tasks list, otherwise it is just killed,
713   *                   w/o affecting job failure status.
714   */
715  @Private
716  public boolean killTask(final TaskAttemptID taskId,
717                          final boolean shouldFail) throws IOException {
718    ensureState(JobState.RUNNING);
719    try {
720      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
721        public Boolean run() throws IOException, InterruptedException {
722          return cluster.getClient().killTask(taskId, shouldFail);
723        }
724      });
725    }
726    catch (InterruptedException ie) {
727      throw new IOException(ie);
728    }
729  }
730
731  /**
732   * Kill indicated task attempt.
733   * 
734   * @param taskId the id of the task to be terminated.
735   * @throws IOException
736   */
737  public void killTask(final TaskAttemptID taskId)
738      throws IOException {
739    killTask(taskId, false);
740  }
741
742  /**
743   * Fail indicated task attempt.
744   * 
745   * @param taskId the id of the task to be terminated.
746   * @throws IOException
747   */
748  public void failTask(final TaskAttemptID taskId)
749      throws IOException {
750    killTask(taskId, true);
751  }
752
753  /**
754   * Gets the counters for this job. May return null if the job has been
755   * retired and the job is no longer in the completed job store.
756   * 
757   * @return the counters for this job.
758   * @throws IOException
759   */
760  public Counters getCounters() 
761      throws IOException {
762    ensureState(JobState.RUNNING);
763    try {
764      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
765        @Override
766        public Counters run() throws IOException, InterruptedException {
767          return cluster.getClient().getJobCounters(getJobID());
768        }
769      });
770    }
771    catch (InterruptedException ie) {
772      throw new IOException(ie);
773    }
774  }
775
776  /**
777   * Gets the diagnostic messages for a given task attempt.
778   * @param taskid
779   * @return the list of diagnostic messages for the task
780   * @throws IOException
781   */
782  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
783      throws IOException, InterruptedException {
784    ensureState(JobState.RUNNING);
785    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
786      @Override
787      public String[] run() throws IOException, InterruptedException {
788        return cluster.getClient().getTaskDiagnostics(taskid);
789      }
790    });
791  }
792
793  /**
794   * Set the number of reduce tasks for the job.
795   * @param tasks the number of reduce tasks
796   * @throws IllegalStateException if the job is submitted
797   */
798  public void setNumReduceTasks(int tasks) throws IllegalStateException {
799    ensureState(JobState.DEFINE);
800    conf.setNumReduceTasks(tasks);
801  }
802
803  /**
804   * Set the current working directory for the default file system.
805   * 
806   * @param dir the new current working directory.
807   * @throws IllegalStateException if the job is submitted
808   */
809  public void setWorkingDirectory(Path dir) throws IOException {
810    ensureState(JobState.DEFINE);
811    conf.setWorkingDirectory(dir);
812  }
813
814  /**
815   * Set the {@link InputFormat} for the job.
816   * @param cls the <code>InputFormat</code> to use
817   * @throws IllegalStateException if the job is submitted
818   */
819  public void setInputFormatClass(Class<? extends InputFormat> cls
820                                  ) throws IllegalStateException {
821    ensureState(JobState.DEFINE);
822    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
823                  InputFormat.class);
824  }
825
826  /**
827   * Set the {@link OutputFormat} for the job.
828   * @param cls the <code>OutputFormat</code> to use
829   * @throws IllegalStateException if the job is submitted
830   */
831  public void setOutputFormatClass(Class<? extends OutputFormat> cls
832                                   ) throws IllegalStateException {
833    ensureState(JobState.DEFINE);
834    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
835                  OutputFormat.class);
836  }
837
838  /**
839   * Set the {@link Mapper} for the job.
840   * @param cls the <code>Mapper</code> to use
841   * @throws IllegalStateException if the job is submitted
842   */
843  public void setMapperClass(Class<? extends Mapper> cls
844                             ) throws IllegalStateException {
845    ensureState(JobState.DEFINE);
846    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
847  }
848
849  /**
850   * Set the Jar by finding where a given class came from.
851   * @param cls the example class
852   */
853  public void setJarByClass(Class<?> cls) {
854    ensureState(JobState.DEFINE);
855    conf.setJarByClass(cls);
856  }
857
858  /**
859   * Set the job jar 
860   */
861  public void setJar(String jar) {
862    ensureState(JobState.DEFINE);
863    conf.setJar(jar);
864  }
865
866  /**
867   * Set the reported username for this job.
868   * 
869   * @param user the username for this job.
870   */
871  public void setUser(String user) {
872    ensureState(JobState.DEFINE);
873    conf.setUser(user);
874  }
875
876  /**
877   * Set the combiner class for the job.
878   * @param cls the combiner to use
879   * @throws IllegalStateException if the job is submitted
880   */
881  public void setCombinerClass(Class<? extends Reducer> cls
882                               ) throws IllegalStateException {
883    ensureState(JobState.DEFINE);
884    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
885  }
886
887  /**
888   * Set the {@link Reducer} for the job.
889   * @param cls the <code>Reducer</code> to use
890   * @throws IllegalStateException if the job is submitted
891   */
892  public void setReducerClass(Class<? extends Reducer> cls
893                              ) throws IllegalStateException {
894    ensureState(JobState.DEFINE);
895    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
896  }
897
898  /**
899   * Set the {@link Partitioner} for the job.
900   * @param cls the <code>Partitioner</code> to use
901   * @throws IllegalStateException if the job is submitted
902   */
903  public void setPartitionerClass(Class<? extends Partitioner> cls
904                                  ) throws IllegalStateException {
905    ensureState(JobState.DEFINE);
906    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
907                  Partitioner.class);
908  }
909
910  /**
911   * Set the key class for the map output data. This allows the user to
912   * specify the map output key class to be different than the final output
913   * value class.
914   * 
915   * @param theClass the map output key class.
916   * @throws IllegalStateException if the job is submitted
917   */
918  public void setMapOutputKeyClass(Class<?> theClass
919                                   ) throws IllegalStateException {
920    ensureState(JobState.DEFINE);
921    conf.setMapOutputKeyClass(theClass);
922  }
923
924  /**
925   * Set the value class for the map output data. This allows the user to
926   * specify the map output value class to be different than the final output
927   * value class.
928   * 
929   * @param theClass the map output value class.
930   * @throws IllegalStateException if the job is submitted
931   */
932  public void setMapOutputValueClass(Class<?> theClass
933                                     ) throws IllegalStateException {
934    ensureState(JobState.DEFINE);
935    conf.setMapOutputValueClass(theClass);
936  }
937
938  /**
939   * Set the key class for the job output data.
940   * 
941   * @param theClass the key class for the job output data.
942   * @throws IllegalStateException if the job is submitted
943   */
944  public void setOutputKeyClass(Class<?> theClass
945                                ) throws IllegalStateException {
946    ensureState(JobState.DEFINE);
947    conf.setOutputKeyClass(theClass);
948  }
949
950  /**
951   * Set the value class for job outputs.
952   * 
953   * @param theClass the value class for job outputs.
954   * @throws IllegalStateException if the job is submitted
955   */
956  public void setOutputValueClass(Class<?> theClass
957                                  ) throws IllegalStateException {
958    ensureState(JobState.DEFINE);
959    conf.setOutputValueClass(theClass);
960  }
961
962  /**
963   * Define the comparator that controls which keys are grouped together
964   * for a single call to combiner,
965   * {@link Reducer#reduce(Object, Iterable,
966   * org.apache.hadoop.mapreduce.Reducer.Context)}
967   *
968   * @param cls the raw comparator to use
969   * @throws IllegalStateException if the job is submitted
970   */
971  public void setCombinerKeyGroupingComparatorClass(
972      Class<? extends RawComparator> cls) throws IllegalStateException {
973    ensureState(JobState.DEFINE);
974    conf.setCombinerKeyGroupingComparator(cls);
975  }
976
977  /**
978   * Define the comparator that controls how the keys are sorted before they
979   * are passed to the {@link Reducer}.
980   * @param cls the raw comparator
981   * @throws IllegalStateException if the job is submitted
982   * @see #setCombinerKeyGroupingComparatorClass(Class)
983   */
984  public void setSortComparatorClass(Class<? extends RawComparator> cls
985                                     ) throws IllegalStateException {
986    ensureState(JobState.DEFINE);
987    conf.setOutputKeyComparatorClass(cls);
988  }
989
990  /**
991   * Define the comparator that controls which keys are grouped together
992   * for a single call to 
993   * {@link Reducer#reduce(Object, Iterable, 
994   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
995   * @param cls the raw comparator to use
996   * @throws IllegalStateException if the job is submitted
997   * @see #setCombinerKeyGroupingComparatorClass(Class)
998   */
999  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1000                                         ) throws IllegalStateException {
1001    ensureState(JobState.DEFINE);
1002    conf.setOutputValueGroupingComparator(cls);
1003  }
1004
1005  /**
1006   * Set the user-specified job name.
1007   * 
1008   * @param name the job's new name.
1009   * @throws IllegalStateException if the job is submitted
1010   */
1011  public void setJobName(String name) throws IllegalStateException {
1012    ensureState(JobState.DEFINE);
1013    conf.setJobName(name);
1014  }
1015
1016  /**
1017   * Turn speculative execution on or off for this job. 
1018   * 
1019   * @param speculativeExecution <code>true</code> if speculative execution 
1020   *                             should be turned on, else <code>false</code>.
1021   */
1022  public void setSpeculativeExecution(boolean speculativeExecution) {
1023    ensureState(JobState.DEFINE);
1024    conf.setSpeculativeExecution(speculativeExecution);
1025  }
1026
1027  /**
1028   * Turn speculative execution on or off for this job for map tasks. 
1029   * 
1030   * @param speculativeExecution <code>true</code> if speculative execution 
1031   *                             should be turned on for map tasks,
1032   *                             else <code>false</code>.
1033   */
1034  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1035    ensureState(JobState.DEFINE);
1036    conf.setMapSpeculativeExecution(speculativeExecution);
1037  }
1038
1039  /**
1040   * Turn speculative execution on or off for this job for reduce tasks. 
1041   * 
1042   * @param speculativeExecution <code>true</code> if speculative execution 
1043   *                             should be turned on for reduce tasks,
1044   *                             else <code>false</code>.
1045   */
1046  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1047    ensureState(JobState.DEFINE);
1048    conf.setReduceSpeculativeExecution(speculativeExecution);
1049  }
1050
1051  /**
1052   * Specify whether job-setup and job-cleanup is needed for the job 
1053   * 
1054   * @param needed If <code>true</code>, job-setup and job-cleanup will be
1055   *               considered from {@link OutputCommitter} 
1056   *               else ignored.
1057   */
1058  public void setJobSetupCleanupNeeded(boolean needed) {
1059    ensureState(JobState.DEFINE);
1060    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1061  }
1062
1063  /**
1064   * Set the given set of archives
1065   * @param archives The list of archives that need to be localized
1066   */
1067  public void setCacheArchives(URI[] archives) {
1068    ensureState(JobState.DEFINE);
1069    DistributedCache.setCacheArchives(archives, conf);
1070  }
1071
1072  /**
1073   * Set the given set of files
1074   * @param files The list of files that need to be localized
1075   */
1076  public void setCacheFiles(URI[] files) {
1077    ensureState(JobState.DEFINE);
1078    DistributedCache.setCacheFiles(files, conf);
1079  }
1080
1081  /**
1082   * Add a archives to be localized
1083   * @param uri The uri of the cache to be localized
1084   */
1085  public void addCacheArchive(URI uri) {
1086    ensureState(JobState.DEFINE);
1087    DistributedCache.addCacheArchive(uri, conf);
1088  }
1089  
1090  /**
1091   * Add a file to be localized
1092   * @param uri The uri of the cache to be localized
1093   */
1094  public void addCacheFile(URI uri) {
1095    ensureState(JobState.DEFINE);
1096    DistributedCache.addCacheFile(uri, conf);
1097  }
1098
1099  /**
1100   * Add an file path to the current set of classpath entries It adds the file
1101   * to cache as well.
1102   * 
1103   * Files added with this method will not be unpacked while being added to the
1104   * classpath.
1105   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1106   * method instead.
1107   *
1108   * @param file Path of the file to be added
1109   */
1110  public void addFileToClassPath(Path file)
1111    throws IOException {
1112    ensureState(JobState.DEFINE);
1113    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1114  }
1115
1116  /**
1117   * Add an archive path to the current set of classpath entries. It adds the
1118   * archive to cache as well.
1119   * 
1120   * Archive files will be unpacked and added to the classpath
1121   * when being distributed.
1122   *
1123   * @param archive Path of the archive to be added
1124   */
1125  public void addArchiveToClassPath(Path archive)
1126    throws IOException {
1127    ensureState(JobState.DEFINE);
1128    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1129  }
1130
1131  /**
1132   * Originally intended to enable symlinks, but currently symlinks cannot be
1133   * disabled.
1134   */
1135  @Deprecated
1136  public void createSymlink() {
1137    ensureState(JobState.DEFINE);
1138    DistributedCache.createSymlink(conf);
1139  }
1140  
1141  /** 
1142   * Expert: Set the number of maximum attempts that will be made to run a
1143   * map task.
1144   * 
1145   * @param n the number of attempts per map task.
1146   */
1147  public void setMaxMapAttempts(int n) {
1148    ensureState(JobState.DEFINE);
1149    conf.setMaxMapAttempts(n);
1150  }
1151
1152  /** 
1153   * Expert: Set the number of maximum attempts that will be made to run a
1154   * reduce task.
1155   * 
1156   * @param n the number of attempts per reduce task.
1157   */
1158  public void setMaxReduceAttempts(int n) {
1159    ensureState(JobState.DEFINE);
1160    conf.setMaxReduceAttempts(n);
1161  }
1162
1163  /**
1164   * Set whether the system should collect profiler information for some of 
1165   * the tasks in this job? The information is stored in the user log 
1166   * directory.
1167   * @param newValue true means it should be gathered
1168   */
1169  public void setProfileEnabled(boolean newValue) {
1170    ensureState(JobState.DEFINE);
1171    conf.setProfileEnabled(newValue);
1172  }
1173
1174  /**
1175   * Set the profiler configuration arguments. If the string contains a '%s' it
1176   * will be replaced with the name of the profiling output file when the task
1177   * runs.
1178   *
1179   * This value is passed to the task child JVM on the command line.
1180   *
1181   * @param value the configuration string
1182   */
1183  public void setProfileParams(String value) {
1184    ensureState(JobState.DEFINE);
1185    conf.setProfileParams(value);
1186  }
1187
1188  /**
1189   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1190   * must also be called.
1191   * @param newValue a set of integer ranges of the map ids
1192   */
1193  public void setProfileTaskRange(boolean isMap, String newValue) {
1194    ensureState(JobState.DEFINE);
1195    conf.setProfileTaskRange(isMap, newValue);
1196  }
1197
1198  private void ensureNotSet(String attr, String msg) throws IOException {
1199    if (conf.get(attr) != null) {
1200      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1201    }    
1202  }
1203  
1204  /**
1205   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1206   * tokens upon job completion. Defaults to true.
1207   */
1208  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1209    ensureState(JobState.DEFINE);
1210    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1211  }
1212
1213  /**
1214   * Default to the new APIs unless they are explicitly set or the old mapper or
1215   * reduce attributes are used.
1216   * @throws IOException if the configuration is inconsistant
1217   */
1218  private void setUseNewAPI() throws IOException {
1219    int numReduces = conf.getNumReduceTasks();
1220    String oldMapperClass = "mapred.mapper.class";
1221    String oldReduceClass = "mapred.reducer.class";
1222    conf.setBooleanIfUnset("mapred.mapper.new-api",
1223                           conf.get(oldMapperClass) == null);
1224    if (conf.getUseNewMapper()) {
1225      String mode = "new map API";
1226      ensureNotSet("mapred.input.format.class", mode);
1227      ensureNotSet(oldMapperClass, mode);
1228      if (numReduces != 0) {
1229        ensureNotSet("mapred.partitioner.class", mode);
1230       } else {
1231        ensureNotSet("mapred.output.format.class", mode);
1232      }      
1233    } else {
1234      String mode = "map compatability";
1235      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1236      ensureNotSet(MAP_CLASS_ATTR, mode);
1237      if (numReduces != 0) {
1238        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1239       } else {
1240        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1241      }
1242    }
1243    if (numReduces != 0) {
1244      conf.setBooleanIfUnset("mapred.reducer.new-api",
1245                             conf.get(oldReduceClass) == null);
1246      if (conf.getUseNewReducer()) {
1247        String mode = "new reduce API";
1248        ensureNotSet("mapred.output.format.class", mode);
1249        ensureNotSet(oldReduceClass, mode);   
1250      } else {
1251        String mode = "reduce compatability";
1252        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1253        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1254      }
1255    }   
1256  }
1257
1258  private synchronized void connect()
1259          throws IOException, InterruptedException, ClassNotFoundException {
1260    if (cluster == null) {
1261      cluster = 
1262        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1263                   public Cluster run()
1264                          throws IOException, InterruptedException, 
1265                                 ClassNotFoundException {
1266                     return new Cluster(getConfiguration());
1267                   }
1268                 });
1269    }
1270  }
1271
1272  boolean isConnected() {
1273    return cluster != null;
1274  }
1275
1276  /** Only for mocking via unit tests. */
1277  @Private
1278  public JobSubmitter getJobSubmitter(FileSystem fs, 
1279      ClientProtocol submitClient) throws IOException {
1280    return new JobSubmitter(fs, submitClient);
1281  }
1282  /**
1283   * Submit the job to the cluster and return immediately.
1284   * @throws IOException
1285   */
1286  public void submit() 
1287         throws IOException, InterruptedException, ClassNotFoundException {
1288    ensureState(JobState.DEFINE);
1289    setUseNewAPI();
1290    connect();
1291    final JobSubmitter submitter = 
1292        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1293    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1294      public JobStatus run() throws IOException, InterruptedException, 
1295      ClassNotFoundException {
1296        return submitter.submitJobInternal(Job.this, cluster);
1297      }
1298    });
1299    state = JobState.RUNNING;
1300    LOG.info("The url to track the job: " + getTrackingURL());
1301   }
1302  
1303  /**
1304   * Submit the job to the cluster and wait for it to finish.
1305   * @param verbose print the progress to the user
1306   * @return true if the job succeeded
1307   * @throws IOException thrown if the communication with the 
1308   *         <code>JobTracker</code> is lost
1309   */
1310  public boolean waitForCompletion(boolean verbose
1311                                   ) throws IOException, InterruptedException,
1312                                            ClassNotFoundException {
1313    if (state == JobState.DEFINE) {
1314      submit();
1315    }
1316    if (verbose) {
1317      monitorAndPrintJob();
1318    } else {
1319      // get the completion poll interval from the client.
1320      int completionPollIntervalMillis = 
1321        Job.getCompletionPollInterval(cluster.getConf());
1322      while (!isComplete()) {
1323        try {
1324          Thread.sleep(completionPollIntervalMillis);
1325        } catch (InterruptedException ie) {
1326        }
1327      }
1328    }
1329    return isSuccessful();
1330  }
1331  
1332  /**
1333   * Monitor a job and print status in real-time as progress is made and tasks 
1334   * fail.
1335   * @return true if the job succeeded
1336   * @throws IOException if communication to the JobTracker fails
1337   */
1338  public boolean monitorAndPrintJob() 
1339      throws IOException, InterruptedException {
1340    String lastReport = null;
1341    Job.TaskStatusFilter filter;
1342    Configuration clientConf = getConfiguration();
1343    filter = Job.getTaskOutputFilter(clientConf);
1344    JobID jobId = getJobID();
1345    LOG.info("Running job: " + jobId);
1346    int eventCounter = 0;
1347    boolean profiling = getProfileEnabled();
1348    IntegerRanges mapRanges = getProfileTaskRange(true);
1349    IntegerRanges reduceRanges = getProfileTaskRange(false);
1350    int progMonitorPollIntervalMillis = 
1351      Job.getProgressPollInterval(clientConf);
1352    /* make sure to report full progress after the job is done */
1353    boolean reportedAfterCompletion = false;
1354    boolean reportedUberMode = false;
1355    while (!isComplete() || !reportedAfterCompletion) {
1356      if (isComplete()) {
1357        reportedAfterCompletion = true;
1358      } else {
1359        Thread.sleep(progMonitorPollIntervalMillis);
1360      }
1361      if (status.getState() == JobStatus.State.PREP) {
1362        continue;
1363      }      
1364      if (!reportedUberMode) {
1365        reportedUberMode = true;
1366        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1367      }      
1368      String report = 
1369        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1370            " reduce " + 
1371            StringUtils.formatPercent(reduceProgress(), 0));
1372      if (!report.equals(lastReport)) {
1373        LOG.info(report);
1374        lastReport = report;
1375      }
1376
1377      TaskCompletionEvent[] events = 
1378        getTaskCompletionEvents(eventCounter, 10); 
1379      eventCounter += events.length;
1380      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1381    }
1382    boolean success = isSuccessful();
1383    if (success) {
1384      LOG.info("Job " + jobId + " completed successfully");
1385    } else {
1386      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1387          " due to: " + status.getFailureInfo());
1388    }
1389    Counters counters = getCounters();
1390    if (counters != null) {
1391      LOG.info(counters.toString());
1392    }
1393    return success;
1394  }
1395
1396  /**
1397   * @return true if the profile parameters indicate that this is using
1398   * hprof, which generates profile files in a particular location
1399   * that we can retrieve to the client.
1400   */
1401  private boolean shouldDownloadProfile() {
1402    // Check the argument string that was used to initialize profiling.
1403    // If this indicates hprof and file-based output, then we're ok to
1404    // download.
1405    String profileParams = getProfileParams();
1406
1407    if (null == profileParams) {
1408      return false;
1409    }
1410
1411    // Split this on whitespace.
1412    String [] parts = profileParams.split("[ \\t]+");
1413
1414    // If any of these indicate hprof, and the use of output files, return true.
1415    boolean hprofFound = false;
1416    boolean fileFound = false;
1417    for (String p : parts) {
1418      if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1419        hprofFound = true;
1420
1421        // This contains a number of comma-delimited components, one of which
1422        // may specify the file to write to. Make sure this is present and
1423        // not empty.
1424        String [] subparts = p.split(",");
1425        for (String sub : subparts) {
1426          if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1427            fileFound = true;
1428          }
1429        }
1430      }
1431    }
1432
1433    return hprofFound && fileFound;
1434  }
1435
1436  private void printTaskEvents(TaskCompletionEvent[] events,
1437      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1438      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1439    for (TaskCompletionEvent event : events) {
1440      switch (filter) {
1441      case NONE:
1442        break;
1443      case SUCCEEDED:
1444        if (event.getStatus() == 
1445          TaskCompletionEvent.Status.SUCCEEDED) {
1446          LOG.info(event.toString());
1447        }
1448        break; 
1449      case FAILED:
1450        if (event.getStatus() == 
1451          TaskCompletionEvent.Status.FAILED) {
1452          LOG.info(event.toString());
1453          // Displaying the task diagnostic information
1454          TaskAttemptID taskId = event.getTaskAttemptId();
1455          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1456          if (taskDiagnostics != null) {
1457            for (String diagnostics : taskDiagnostics) {
1458              System.err.println(diagnostics);
1459            }
1460          }
1461        }
1462        break; 
1463      case KILLED:
1464        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1465          LOG.info(event.toString());
1466        }
1467        break; 
1468      case ALL:
1469        LOG.info(event.toString());
1470        break;
1471      }
1472    }
1473  }
1474
1475  /** The interval at which monitorAndPrintJob() prints status */
1476  public static int getProgressPollInterval(Configuration conf) {
1477    // Read progress monitor poll interval from config. Default is 1 second.
1478    int progMonitorPollIntervalMillis = conf.getInt(
1479      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1480    if (progMonitorPollIntervalMillis < 1) {
1481      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1482        " has been set to an invalid value; "
1483        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1484      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1485    }
1486    return progMonitorPollIntervalMillis;
1487  }
1488
1489  /** The interval at which waitForCompletion() should check. */
1490  public static int getCompletionPollInterval(Configuration conf) {
1491    int completionPollIntervalMillis = conf.getInt(
1492      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1493    if (completionPollIntervalMillis < 1) { 
1494      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1495       " has been set to an invalid value; "
1496       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1497      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1498    }
1499    return completionPollIntervalMillis;
1500  }
1501
1502  /**
1503   * Get the task output filter.
1504   * 
1505   * @param conf the configuration.
1506   * @return the filter level.
1507   */
1508  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1509    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1510  }
1511
1512  /**
1513   * Modify the Configuration to set the task output filter.
1514   * 
1515   * @param conf the Configuration to modify.
1516   * @param newValue the value to set.
1517   */
1518  public static void setTaskOutputFilter(Configuration conf, 
1519      TaskStatusFilter newValue) {
1520    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1521  }
1522
1523  public boolean isUber() throws IOException, InterruptedException {
1524    ensureState(JobState.RUNNING);
1525    updateStatus();
1526    return status.isUber();
1527  }
1528
1529  /**
1530   * Get the reservation to which the job is submitted to, if any
1531   *
1532   * @return the reservationId the identifier of the job's reservation, null if
1533   *         the job does not have any reservation associated with it
1534   */
1535  public ReservationId getReservationId() {
1536    return reservationId;
1537  }
1538
1539  /**
1540   * Set the reservation to which the job is submitted to
1541   *
1542   * @param reservationId the reservationId to set
1543   */
1544  public void setReservationId(ReservationId reservationId) {
1545    this.reservationId = reservationId;
1546  }
1547  
1548}