001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025import org.apache.commons.logging.*;
026import org.apache.hadoop.util.Options;
027import org.apache.hadoop.fs.*;
028import org.apache.hadoop.fs.Options.CreateOpts;
029import org.apache.hadoop.io.compress.CodecPool;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionInputStream;
032import org.apache.hadoop.io.compress.CompressionOutputStream;
033import org.apache.hadoop.io.compress.Compressor;
034import org.apache.hadoop.io.compress.Decompressor;
035import org.apache.hadoop.io.compress.DefaultCodec;
036import org.apache.hadoop.io.compress.GzipCodec;
037import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038import org.apache.hadoop.io.serializer.Deserializer;
039import org.apache.hadoop.io.serializer.Serializer;
040import org.apache.hadoop.io.serializer.SerializationFactory;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.*;
044import org.apache.hadoop.util.Progressable;
045import org.apache.hadoop.util.Progress;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.hadoop.util.NativeCodeLoader;
048import org.apache.hadoop.util.MergeSort;
049import org.apache.hadoop.util.PriorityQueue;
050import org.apache.hadoop.util.Time;
051
052/** 
053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
054 * pairs.
055 * 
056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
058 * reading and sorting respectively.</p>
059 * 
060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
061 * {@link CompressionType} used to compress key/value pairs:
062 * <ol>
063 *   <li>
064 *   <code>Writer</code> : Uncompressed records.
065 *   </li>
066 *   <li>
067 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
068 *                                       values.
069 *   </li>
070 *   <li>
071 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
072 *                                      values are collected in 'blocks' 
073 *                                      separately and compressed. The size of 
074 *                                      the 'block' is configurable.
075 * </ol>
076 * 
077 * <p>The actual compression algorithm used to compress key and/or values can be
078 * specified by using the appropriate {@link CompressionCodec}.</p>
079 * 
080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
082 *
083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
084 * above <code>SequenceFile</code> formats.</p>
085 *
086 * <h4 id="Formats">SequenceFile Formats</h4>
087 * 
088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
089 * depending on the <code>CompressionType</code> specified. All of them share a
090 * <a href="#Header">common header</a> described below.
091 * 
092 * <h5 id="Header">SequenceFile Header</h5>
093 * <ul>
094 *   <li>
095 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
096 *             version number (e.g. SEQ4 or SEQ6)
097 *   </li>
098 *   <li>
099 *   keyClassName -key class
100 *   </li>
101 *   <li>
102 *   valueClassName - value class
103 *   </li>
104 *   <li>
105 *   compression - A boolean which specifies if compression is turned on for 
106 *                 keys/values in this file.
107 *   </li>
108 *   <li>
109 *   blockCompression - A boolean which specifies if block-compression is 
110 *                      turned on for keys/values in this file.
111 *   </li>
112 *   <li>
113 *   compression codec - <code>CompressionCodec</code> class which is used for  
114 *                       compression of keys and/or values (if compression is 
115 *                       enabled).
116 *   </li>
117 *   <li>
118 *   metadata - {@link Metadata} for this file.
119 *   </li>
120 *   <li>
121 *   sync - A sync marker to denote end of the header.
122 *   </li>
123 * </ul>
124 * 
125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
126 * <ul>
127 * <li>
128 * <a href="#Header">Header</a>
129 * </li>
130 * <li>
131 * Record
132 *   <ul>
133 *     <li>Record length</li>
134 *     <li>Key length</li>
135 *     <li>Key</li>
136 *     <li>Value</li>
137 *   </ul>
138 * </li>
139 * <li>
140 * A sync-marker every few <code>100</code> bytes or so.
141 * </li>
142 * </ul>
143 *
144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
145 * <ul>
146 * <li>
147 * <a href="#Header">Header</a>
148 * </li>
149 * <li>
150 * Record
151 *   <ul>
152 *     <li>Record length</li>
153 *     <li>Key length</li>
154 *     <li>Key</li>
155 *     <li><i>Compressed</i> Value</li>
156 *   </ul>
157 * </li>
158 * <li>
159 * A sync-marker every few <code>100</code> bytes or so.
160 * </li>
161 * </ul>
162 * 
163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
164 * <ul>
165 * <li>
166 * <a href="#Header">Header</a>
167 * </li>
168 * <li>
169 * Record <i>Block</i>
170 *   <ul>
171 *     <li>Uncompressed number of records in the block</li>
172 *     <li>Compressed key-lengths block-size</li>
173 *     <li>Compressed key-lengths block</li>
174 *     <li>Compressed keys block-size</li>
175 *     <li>Compressed keys block</li>
176 *     <li>Compressed value-lengths block-size</li>
177 *     <li>Compressed value-lengths block</li>
178 *     <li>Compressed values block-size</li>
179 *     <li>Compressed values block</li>
180 *   </ul>
181 * </li>
182 * <li>
183 * A sync-marker every block.
184 * </li>
185 * </ul>
186 * 
187 * <p>The compressed blocks of key lengths and value lengths consist of the 
188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
189 * format.</p>
190 * 
191 * @see CompressionCodec
192 */
193@InterfaceAudience.Public
194@InterfaceStability.Stable
195public class SequenceFile {
196  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
197
198  private SequenceFile() {}                         // no public ctor
199
200  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
201  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
202  private static final byte VERSION_WITH_METADATA = (byte)6;
203  private static byte[] VERSION = new byte[] {
204    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
205  };
206
207  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
208  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
209  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
210
211  /** The number of bytes between sync points.*/
212  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
213
214  /** 
215   * The compression type used to compress key/value pairs in the 
216   * {@link SequenceFile}.
217   * 
218   * @see SequenceFile.Writer
219   */
220  public static enum CompressionType {
221    /** Do not compress records. */
222    NONE, 
223    /** Compress values only, each separately. */
224    RECORD,
225    /** Compress sequences of records together in blocks. */
226    BLOCK
227  }
228
229  /**
230   * Get the compression type for the reduce outputs
231   * @param job the job config to look in
232   * @return the kind of compression to use
233   */
234  static public CompressionType getDefaultCompressionType(Configuration job) {
235    String name = job.get("io.seqfile.compression.type");
236    return name == null ? CompressionType.RECORD : 
237      CompressionType.valueOf(name);
238  }
239  
240  /**
241   * Set the default compression type for sequence files.
242   * @param job the configuration to modify
243   * @param val the new compression type (none, block, record)
244   */
245  static public void setDefaultCompressionType(Configuration job, 
246                                               CompressionType val) {
247    job.set("io.seqfile.compression.type", val.toString());
248  }
249
250  /**
251   * Create a new Writer with the given options.
252   * @param conf the configuration to use
253   * @param opts the options to create the file with
254   * @return a new Writer
255   * @throws IOException
256   */
257  public static Writer createWriter(Configuration conf, Writer.Option... opts
258                                    ) throws IOException {
259    Writer.CompressionOption compressionOption = 
260      Options.getOption(Writer.CompressionOption.class, opts);
261    CompressionType kind;
262    if (compressionOption != null) {
263      kind = compressionOption.getValue();
264    } else {
265      kind = getDefaultCompressionType(conf);
266      opts = Options.prependOptions(opts, Writer.compression(kind));
267    }
268    switch (kind) {
269      default:
270      case NONE:
271        return new Writer(conf, opts);
272      case RECORD:
273        return new RecordCompressWriter(conf, opts);
274      case BLOCK:
275        return new BlockCompressWriter(conf, opts);
276    }
277  }
278
279  /**
280   * Construct the preferred type of SequenceFile Writer.
281   * @param fs The configured filesystem. 
282   * @param conf The configuration.
283   * @param name The name of the file. 
284   * @param keyClass The 'key' type.
285   * @param valClass The 'value' type.
286   * @return Returns the handle to the constructed SequenceFile Writer.
287   * @throws IOException
288   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
289   *     instead.
290   */
291  @Deprecated
292  public static Writer 
293    createWriter(FileSystem fs, Configuration conf, Path name, 
294                 Class keyClass, Class valClass) throws IOException {
295    return createWriter(conf, Writer.filesystem(fs),
296                        Writer.file(name), Writer.keyClass(keyClass),
297                        Writer.valueClass(valClass));
298  }
299  
300  /**
301   * Construct the preferred type of SequenceFile Writer.
302   * @param fs The configured filesystem. 
303   * @param conf The configuration.
304   * @param name The name of the file. 
305   * @param keyClass The 'key' type.
306   * @param valClass The 'value' type.
307   * @param compressionType The compression type.
308   * @return Returns the handle to the constructed SequenceFile Writer.
309   * @throws IOException
310   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
311   *     instead.
312   */
313  @Deprecated
314  public static Writer 
315    createWriter(FileSystem fs, Configuration conf, Path name, 
316                 Class keyClass, Class valClass, 
317                 CompressionType compressionType) throws IOException {
318    return createWriter(conf, Writer.filesystem(fs),
319                        Writer.file(name), Writer.keyClass(keyClass),
320                        Writer.valueClass(valClass), 
321                        Writer.compression(compressionType));
322  }
323  
324  /**
325   * Construct the preferred type of SequenceFile Writer.
326   * @param fs The configured filesystem. 
327   * @param conf The configuration.
328   * @param name The name of the file. 
329   * @param keyClass The 'key' type.
330   * @param valClass The 'value' type.
331   * @param compressionType The compression type.
332   * @param progress The Progressable object to track progress.
333   * @return Returns the handle to the constructed SequenceFile Writer.
334   * @throws IOException
335   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
336   *     instead.
337   */
338  @Deprecated
339  public static Writer
340    createWriter(FileSystem fs, Configuration conf, Path name, 
341                 Class keyClass, Class valClass, CompressionType compressionType,
342                 Progressable progress) throws IOException {
343    return createWriter(conf, Writer.file(name),
344                        Writer.filesystem(fs),
345                        Writer.keyClass(keyClass),
346                        Writer.valueClass(valClass), 
347                        Writer.compression(compressionType),
348                        Writer.progressable(progress));
349  }
350
351  /**
352   * Construct the preferred type of SequenceFile Writer.
353   * @param fs The configured filesystem. 
354   * @param conf The configuration.
355   * @param name The name of the file. 
356   * @param keyClass The 'key' type.
357   * @param valClass The 'value' type.
358   * @param compressionType The compression type.
359   * @param codec The compression codec.
360   * @return Returns the handle to the constructed SequenceFile Writer.
361   * @throws IOException
362   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
363   *     instead.
364   */
365  @Deprecated
366  public static Writer 
367    createWriter(FileSystem fs, Configuration conf, Path name, 
368                 Class keyClass, Class valClass, CompressionType compressionType, 
369                 CompressionCodec codec) throws IOException {
370    return createWriter(conf, Writer.file(name),
371                        Writer.filesystem(fs),
372                        Writer.keyClass(keyClass),
373                        Writer.valueClass(valClass), 
374                        Writer.compression(compressionType, codec));
375  }
376  
377  /**
378   * Construct the preferred type of SequenceFile Writer.
379   * @param fs The configured filesystem. 
380   * @param conf The configuration.
381   * @param name The name of the file. 
382   * @param keyClass The 'key' type.
383   * @param valClass The 'value' type.
384   * @param compressionType The compression type.
385   * @param codec The compression codec.
386   * @param progress The Progressable object to track progress.
387   * @param metadata The metadata of the file.
388   * @return Returns the handle to the constructed SequenceFile Writer.
389   * @throws IOException
390   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
391   *     instead.
392   */
393  @Deprecated
394  public static Writer
395    createWriter(FileSystem fs, Configuration conf, Path name, 
396                 Class keyClass, Class valClass, 
397                 CompressionType compressionType, CompressionCodec codec,
398                 Progressable progress, Metadata metadata) throws IOException {
399    return createWriter(conf, Writer.file(name),
400                        Writer.filesystem(fs),
401                        Writer.keyClass(keyClass),
402                        Writer.valueClass(valClass),
403                        Writer.compression(compressionType, codec),
404                        Writer.progressable(progress),
405                        Writer.metadata(metadata));
406  }
407
408  /**
409   * Construct the preferred type of SequenceFile Writer.
410   * @param fs The configured filesystem.
411   * @param conf The configuration.
412   * @param name The name of the file.
413   * @param keyClass The 'key' type.
414   * @param valClass The 'value' type.
415   * @param bufferSize buffer size for the underlaying outputstream.
416   * @param replication replication factor for the file.
417   * @param blockSize block size for the file.
418   * @param compressionType The compression type.
419   * @param codec The compression codec.
420   * @param progress The Progressable object to track progress.
421   * @param metadata The metadata of the file.
422   * @return Returns the handle to the constructed SequenceFile Writer.
423   * @throws IOException
424   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
425   *     instead.
426   */
427  @Deprecated
428  public static Writer
429    createWriter(FileSystem fs, Configuration conf, Path name,
430                 Class keyClass, Class valClass, int bufferSize,
431                 short replication, long blockSize,
432                 CompressionType compressionType, CompressionCodec codec,
433                 Progressable progress, Metadata metadata) throws IOException {
434    return createWriter(conf, Writer.file(name),
435                        Writer.filesystem(fs),
436                        Writer.keyClass(keyClass),
437                        Writer.valueClass(valClass), 
438                        Writer.bufferSize(bufferSize), 
439                        Writer.replication(replication),
440                        Writer.blockSize(blockSize),
441                        Writer.compression(compressionType, codec),
442                        Writer.progressable(progress),
443                        Writer.metadata(metadata));
444  }
445
446  /**
447   * Construct the preferred type of SequenceFile Writer.
448   * @param fs The configured filesystem.
449   * @param conf The configuration.
450   * @param name The name of the file.
451   * @param keyClass The 'key' type.
452   * @param valClass The 'value' type.
453   * @param bufferSize buffer size for the underlaying outputstream.
454   * @param replication replication factor for the file.
455   * @param blockSize block size for the file.
456   * @param createParent create parent directory if non-existent
457   * @param compressionType The compression type.
458   * @param codec The compression codec.
459   * @param metadata The metadata of the file.
460   * @return Returns the handle to the constructed SequenceFile Writer.
461   * @throws IOException
462   */
463  @Deprecated
464  public static Writer
465  createWriter(FileSystem fs, Configuration conf, Path name,
466               Class keyClass, Class valClass, int bufferSize,
467               short replication, long blockSize, boolean createParent,
468               CompressionType compressionType, CompressionCodec codec,
469               Metadata metadata) throws IOException {
470    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
471        conf, name, keyClass, valClass, compressionType, codec,
472        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
473        CreateOpts.bufferSize(bufferSize),
474        createParent ? CreateOpts.createParent()
475                     : CreateOpts.donotCreateParent(),
476        CreateOpts.repFac(replication),
477        CreateOpts.blockSize(blockSize)
478      );
479  }
480
481  /**
482   * Construct the preferred type of SequenceFile Writer.
483   * @param fc The context for the specified file.
484   * @param conf The configuration.
485   * @param name The name of the file.
486   * @param keyClass The 'key' type.
487   * @param valClass The 'value' type.
488   * @param compressionType The compression type.
489   * @param codec The compression codec.
490   * @param metadata The metadata of the file.
491   * @param createFlag gives the semantics of create: overwrite, append etc.
492   * @param opts file creation options; see {@link CreateOpts}.
493   * @return Returns the handle to the constructed SequenceFile Writer.
494   * @throws IOException
495   */
496  public static Writer
497  createWriter(FileContext fc, Configuration conf, Path name,
498               Class keyClass, Class valClass,
499               CompressionType compressionType, CompressionCodec codec,
500               Metadata metadata,
501               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
502               throws IOException {
503    return createWriter(conf, fc.create(name, createFlag, opts),
504          keyClass, valClass, compressionType, codec, metadata).ownStream();
505  }
506
507  /**
508   * Construct the preferred type of SequenceFile Writer.
509   * @param fs The configured filesystem. 
510   * @param conf The configuration.
511   * @param name The name of the file. 
512   * @param keyClass The 'key' type.
513   * @param valClass The 'value' type.
514   * @param compressionType The compression type.
515   * @param codec The compression codec.
516   * @param progress The Progressable object to track progress.
517   * @return Returns the handle to the constructed SequenceFile Writer.
518   * @throws IOException
519   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
520   *     instead.
521   */
522  @Deprecated
523  public static Writer
524    createWriter(FileSystem fs, Configuration conf, Path name, 
525                 Class keyClass, Class valClass, 
526                 CompressionType compressionType, CompressionCodec codec,
527                 Progressable progress) throws IOException {
528    return createWriter(conf, Writer.file(name),
529                        Writer.filesystem(fs),
530                        Writer.keyClass(keyClass),
531                        Writer.valueClass(valClass),
532                        Writer.compression(compressionType, codec),
533                        Writer.progressable(progress));
534  }
535
536  /**
537   * Construct the preferred type of 'raw' SequenceFile Writer.
538   * @param conf The configuration.
539   * @param out The stream on top which the writer is to be constructed.
540   * @param keyClass The 'key' type.
541   * @param valClass The 'value' type.
542   * @param compressionType The compression type.
543   * @param codec The compression codec.
544   * @param metadata The metadata of the file.
545   * @return Returns the handle to the constructed SequenceFile Writer.
546   * @throws IOException
547   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
548   *     instead.
549   */
550  @Deprecated
551  public static Writer
552    createWriter(Configuration conf, FSDataOutputStream out, 
553                 Class keyClass, Class valClass,
554                 CompressionType compressionType,
555                 CompressionCodec codec, Metadata metadata) throws IOException {
556    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
557                        Writer.valueClass(valClass), 
558                        Writer.compression(compressionType, codec),
559                        Writer.metadata(metadata));
560  }
561  
562  /**
563   * Construct the preferred type of 'raw' SequenceFile Writer.
564   * @param conf The configuration.
565   * @param out The stream on top which the writer is to be constructed.
566   * @param keyClass The 'key' type.
567   * @param valClass The 'value' type.
568   * @param compressionType The compression type.
569   * @param codec The compression codec.
570   * @return Returns the handle to the constructed SequenceFile Writer.
571   * @throws IOException
572   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
573   *     instead.
574   */
575  @Deprecated
576  public static Writer
577    createWriter(Configuration conf, FSDataOutputStream out, 
578                 Class keyClass, Class valClass, CompressionType compressionType,
579                 CompressionCodec codec) throws IOException {
580    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
581                        Writer.valueClass(valClass),
582                        Writer.compression(compressionType, codec));
583  }
584  
585
586  /** The interface to 'raw' values of SequenceFiles. */
587  public static interface ValueBytes {
588
589    /** Writes the uncompressed bytes to the outStream.
590     * @param outStream : Stream to write uncompressed bytes into.
591     * @throws IOException
592     */
593    public void writeUncompressedBytes(DataOutputStream outStream)
594      throws IOException;
595
596    /** Write compressed bytes to outStream. 
597     * Note: that it will NOT compress the bytes if they are not compressed.
598     * @param outStream : Stream to write compressed bytes into.
599     */
600    public void writeCompressedBytes(DataOutputStream outStream) 
601      throws IllegalArgumentException, IOException;
602
603    /**
604     * Size of stored data.
605     */
606    public int getSize();
607  }
608  
609  private static class UncompressedBytes implements ValueBytes {
610    private int dataSize;
611    private byte[] data;
612    
613    private UncompressedBytes() {
614      data = null;
615      dataSize = 0;
616    }
617    
618    private void reset(DataInputStream in, int length) throws IOException {
619      if (data == null) {
620        data = new byte[length];
621      } else if (length > data.length) {
622        data = new byte[Math.max(length, data.length * 2)];
623      }
624      dataSize = -1;
625      in.readFully(data, 0, length);
626      dataSize = length;
627    }
628    
629    @Override
630    public int getSize() {
631      return dataSize;
632    }
633    
634    @Override
635    public void writeUncompressedBytes(DataOutputStream outStream)
636      throws IOException {
637      outStream.write(data, 0, dataSize);
638    }
639
640    @Override
641    public void writeCompressedBytes(DataOutputStream outStream) 
642      throws IllegalArgumentException, IOException {
643      throw 
644        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
645    }
646
647  } // UncompressedBytes
648  
649  private static class CompressedBytes implements ValueBytes {
650    private int dataSize;
651    private byte[] data;
652    DataInputBuffer rawData = null;
653    CompressionCodec codec = null;
654    CompressionInputStream decompressedStream = null;
655
656    private CompressedBytes(CompressionCodec codec) {
657      data = null;
658      dataSize = 0;
659      this.codec = codec;
660    }
661
662    private void reset(DataInputStream in, int length) throws IOException {
663      if (data == null) {
664        data = new byte[length];
665      } else if (length > data.length) {
666        data = new byte[Math.max(length, data.length * 2)];
667      } 
668      dataSize = -1;
669      in.readFully(data, 0, length);
670      dataSize = length;
671    }
672    
673    @Override
674    public int getSize() {
675      return dataSize;
676    }
677    
678    @Override
679    public void writeUncompressedBytes(DataOutputStream outStream)
680      throws IOException {
681      if (decompressedStream == null) {
682        rawData = new DataInputBuffer();
683        decompressedStream = codec.createInputStream(rawData);
684      } else {
685        decompressedStream.resetState();
686      }
687      rawData.reset(data, 0, dataSize);
688
689      byte[] buffer = new byte[8192];
690      int bytesRead = 0;
691      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
692        outStream.write(buffer, 0, bytesRead);
693      }
694    }
695
696    @Override
697    public void writeCompressedBytes(DataOutputStream outStream) 
698      throws IllegalArgumentException, IOException {
699      outStream.write(data, 0, dataSize);
700    }
701
702  } // CompressedBytes
703  
704  /**
705   * The class encapsulating with the metadata of a file.
706   * The metadata of a file is a list of attribute name/value
707   * pairs of Text type.
708   *
709   */
710  public static class Metadata implements Writable {
711
712    private TreeMap<Text, Text> theMetadata;
713    
714    public Metadata() {
715      this(new TreeMap<Text, Text>());
716    }
717    
718    public Metadata(TreeMap<Text, Text> arg) {
719      if (arg == null) {
720        this.theMetadata = new TreeMap<Text, Text>();
721      } else {
722        this.theMetadata = arg;
723      }
724    }
725    
726    public Text get(Text name) {
727      return this.theMetadata.get(name);
728    }
729    
730    public void set(Text name, Text value) {
731      this.theMetadata.put(name, value);
732    }
733    
734    public TreeMap<Text, Text> getMetadata() {
735      return new TreeMap<Text, Text>(this.theMetadata);
736    }
737    
738    @Override
739    public void write(DataOutput out) throws IOException {
740      out.writeInt(this.theMetadata.size());
741      Iterator<Map.Entry<Text, Text>> iter =
742        this.theMetadata.entrySet().iterator();
743      while (iter.hasNext()) {
744        Map.Entry<Text, Text> en = iter.next();
745        en.getKey().write(out);
746        en.getValue().write(out);
747      }
748    }
749
750    @Override
751    public void readFields(DataInput in) throws IOException {
752      int sz = in.readInt();
753      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
754      this.theMetadata = new TreeMap<Text, Text>();
755      for (int i = 0; i < sz; i++) {
756        Text key = new Text();
757        Text val = new Text();
758        key.readFields(in);
759        val.readFields(in);
760        this.theMetadata.put(key, val);
761      }    
762    }
763
764    @Override
765    public boolean equals(Object other) {
766      if (other == null) {
767        return false;
768      }
769      if (other.getClass() != this.getClass()) {
770        return false;
771      } else {
772        return equals((Metadata)other);
773      }
774    }
775    
776    public boolean equals(Metadata other) {
777      if (other == null) return false;
778      if (this.theMetadata.size() != other.theMetadata.size()) {
779        return false;
780      }
781      Iterator<Map.Entry<Text, Text>> iter1 =
782        this.theMetadata.entrySet().iterator();
783      Iterator<Map.Entry<Text, Text>> iter2 =
784        other.theMetadata.entrySet().iterator();
785      while (iter1.hasNext() && iter2.hasNext()) {
786        Map.Entry<Text, Text> en1 = iter1.next();
787        Map.Entry<Text, Text> en2 = iter2.next();
788        if (!en1.getKey().equals(en2.getKey())) {
789          return false;
790        }
791        if (!en1.getValue().equals(en2.getValue())) {
792          return false;
793        }
794      }
795      if (iter1.hasNext() || iter2.hasNext()) {
796        return false;
797      }
798      return true;
799    }
800
801    @Override
802    public int hashCode() {
803      assert false : "hashCode not designed";
804      return 42; // any arbitrary constant will do 
805    }
806    
807    @Override
808    public String toString() {
809      StringBuilder sb = new StringBuilder();
810      sb.append("size: ").append(this.theMetadata.size()).append("\n");
811      Iterator<Map.Entry<Text, Text>> iter =
812        this.theMetadata.entrySet().iterator();
813      while (iter.hasNext()) {
814        Map.Entry<Text, Text> en = iter.next();
815        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
816        sb.append("\n");
817      }
818      return sb.toString();
819    }
820  }
821  
822  /** Write key/value pairs to a sequence-format file. */
823  public static class Writer implements java.io.Closeable, Syncable {
824    private Configuration conf;
825    FSDataOutputStream out;
826    boolean ownOutputStream = true;
827    DataOutputBuffer buffer = new DataOutputBuffer();
828
829    Class keyClass;
830    Class valClass;
831
832    private final CompressionType compress;
833    CompressionCodec codec = null;
834    CompressionOutputStream deflateFilter = null;
835    DataOutputStream deflateOut = null;
836    Metadata metadata = null;
837    Compressor compressor = null;
838    
839    protected Serializer keySerializer;
840    protected Serializer uncompressedValSerializer;
841    protected Serializer compressedValSerializer;
842    
843    // Insert a globally unique 16-byte value every few entries, so that one
844    // can seek into the middle of a file and then synchronize with record
845    // starts and ends by scanning for this value.
846    long lastSyncPos;                     // position of last sync
847    byte[] sync;                          // 16 random bytes
848    {
849      try {                                       
850        MessageDigest digester = MessageDigest.getInstance("MD5");
851        long time = Time.now();
852        digester.update((new UID()+"@"+time).getBytes());
853        sync = digester.digest();
854      } catch (Exception e) {
855        throw new RuntimeException(e);
856      }
857    }
858
859    public static interface Option {}
860    
861    static class FileOption extends Options.PathOption 
862                                    implements Option {
863      FileOption(Path path) {
864        super(path);
865      }
866    }
867
868    /**
869     * @deprecated only used for backwards-compatibility in the createWriter methods
870     * that take FileSystem.
871     */
872    @Deprecated
873    private static class FileSystemOption implements Option {
874      private final FileSystem value;
875      protected FileSystemOption(FileSystem value) {
876        this.value = value;
877      }
878      public FileSystem getValue() {
879        return value;
880      }
881    }
882
883    static class StreamOption extends Options.FSDataOutputStreamOption 
884                              implements Option {
885      StreamOption(FSDataOutputStream stream) {
886        super(stream);
887      }
888    }
889
890    static class BufferSizeOption extends Options.IntegerOption
891                                  implements Option {
892      BufferSizeOption(int value) {
893        super(value);
894      }
895    }
896    
897    static class BlockSizeOption extends Options.LongOption implements Option {
898      BlockSizeOption(long value) {
899        super(value);
900      }
901    }
902
903    static class ReplicationOption extends Options.IntegerOption
904                                   implements Option {
905      ReplicationOption(int value) {
906        super(value);
907      }
908    }
909
910    static class KeyClassOption extends Options.ClassOption implements Option {
911      KeyClassOption(Class<?> value) {
912        super(value);
913      }
914    }
915
916    static class ValueClassOption extends Options.ClassOption
917                                          implements Option {
918      ValueClassOption(Class<?> value) {
919        super(value);
920      }
921    }
922
923    static class MetadataOption implements Option {
924      private final Metadata value;
925      MetadataOption(Metadata value) {
926        this.value = value;
927      }
928      Metadata getValue() {
929        return value;
930      }
931    }
932
933    static class ProgressableOption extends Options.ProgressableOption
934                                    implements Option {
935      ProgressableOption(Progressable value) {
936        super(value);
937      }
938    }
939
940    private static class CompressionOption implements Option {
941      private final CompressionType value;
942      private final CompressionCodec codec;
943      CompressionOption(CompressionType value) {
944        this(value, null);
945      }
946      CompressionOption(CompressionType value, CompressionCodec codec) {
947        this.value = value;
948        this.codec = (CompressionType.NONE != value && null == codec)
949          ? new DefaultCodec()
950          : codec;
951      }
952      CompressionType getValue() {
953        return value;
954      }
955      CompressionCodec getCodec() {
956        return codec;
957      }
958    }
959    
960    public static Option file(Path value) {
961      return new FileOption(value);
962    }
963
964    /**
965     * @deprecated only used for backwards-compatibility in the createWriter methods
966     * that take FileSystem.
967     */
968    @Deprecated
969    private static Option filesystem(FileSystem fs) {
970      return new SequenceFile.Writer.FileSystemOption(fs);
971    }
972    
973    public static Option bufferSize(int value) {
974      return new BufferSizeOption(value);
975    }
976    
977    public static Option stream(FSDataOutputStream value) {
978      return new StreamOption(value);
979    }
980    
981    public static Option replication(short value) {
982      return new ReplicationOption(value);
983    }
984    
985    public static Option blockSize(long value) {
986      return new BlockSizeOption(value);
987    }
988    
989    public static Option progressable(Progressable value) {
990      return new ProgressableOption(value);
991    }
992
993    public static Option keyClass(Class<?> value) {
994      return new KeyClassOption(value);
995    }
996    
997    public static Option valueClass(Class<?> value) {
998      return new ValueClassOption(value);
999    }
1000    
1001    public static Option metadata(Metadata value) {
1002      return new MetadataOption(value);
1003    }
1004
1005    public static Option compression(CompressionType value) {
1006      return new CompressionOption(value);
1007    }
1008
1009    public static Option compression(CompressionType value,
1010        CompressionCodec codec) {
1011      return new CompressionOption(value, codec);
1012    }
1013    
1014    /**
1015     * Construct a uncompressed writer from a set of options.
1016     * @param conf the configuration to use
1017     * @param options the options used when creating the writer
1018     * @throws IOException if it fails
1019     */
1020    Writer(Configuration conf, 
1021           Option... opts) throws IOException {
1022      BlockSizeOption blockSizeOption = 
1023        Options.getOption(BlockSizeOption.class, opts);
1024      BufferSizeOption bufferSizeOption = 
1025        Options.getOption(BufferSizeOption.class, opts);
1026      ReplicationOption replicationOption = 
1027        Options.getOption(ReplicationOption.class, opts);
1028      ProgressableOption progressOption = 
1029        Options.getOption(ProgressableOption.class, opts);
1030      FileOption fileOption = Options.getOption(FileOption.class, opts);
1031      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1032      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1033      KeyClassOption keyClassOption = 
1034        Options.getOption(KeyClassOption.class, opts);
1035      ValueClassOption valueClassOption = 
1036        Options.getOption(ValueClassOption.class, opts);
1037      MetadataOption metadataOption = 
1038        Options.getOption(MetadataOption.class, opts);
1039      CompressionOption compressionTypeOption =
1040        Options.getOption(CompressionOption.class, opts);
1041      // check consistency of options
1042      if ((fileOption == null) == (streamOption == null)) {
1043        throw new IllegalArgumentException("file or stream must be specified");
1044      }
1045      if (fileOption == null && (blockSizeOption != null ||
1046                                 bufferSizeOption != null ||
1047                                 replicationOption != null ||
1048                                 progressOption != null)) {
1049        throw new IllegalArgumentException("file modifier options not " +
1050                                           "compatible with stream");
1051      }
1052
1053      FSDataOutputStream out;
1054      boolean ownStream = fileOption != null;
1055      if (ownStream) {
1056        Path p = fileOption.getValue();
1057        FileSystem fs;
1058        if (fsOption != null) {
1059          fs = fsOption.getValue();
1060        } else {
1061          fs = p.getFileSystem(conf);
1062        }
1063        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1064          bufferSizeOption.getValue();
1065        short replication = replicationOption == null ? 
1066          fs.getDefaultReplication(p) :
1067          (short) replicationOption.getValue();
1068        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1069          blockSizeOption.getValue();
1070        Progressable progress = progressOption == null ? null :
1071          progressOption.getValue();
1072        out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1073      } else {
1074        out = streamOption.getValue();
1075      }
1076      Class<?> keyClass = keyClassOption == null ?
1077          Object.class : keyClassOption.getValue();
1078      Class<?> valueClass = valueClassOption == null ?
1079          Object.class : valueClassOption.getValue();
1080      Metadata metadata = metadataOption == null ?
1081          new Metadata() : metadataOption.getValue();
1082      this.compress = compressionTypeOption.getValue();
1083      final CompressionCodec codec = compressionTypeOption.getCodec();
1084      if (codec != null &&
1085          (codec instanceof GzipCodec) &&
1086          !NativeCodeLoader.isNativeCodeLoaded() &&
1087          !ZlibFactory.isNativeZlibLoaded(conf)) {
1088        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1089                                           "GzipCodec without native-hadoop " +
1090                                           "code!");
1091      }
1092      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1093    }
1094
1095    /** Create the named file.
1096     * @deprecated Use 
1097     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1098     *   instead.
1099     */
1100    @Deprecated
1101    public Writer(FileSystem fs, Configuration conf, Path name, 
1102                  Class keyClass, Class valClass) throws IOException {
1103      this.compress = CompressionType.NONE;
1104      init(conf, fs.create(name), true, keyClass, valClass, null, 
1105           new Metadata());
1106    }
1107    
1108    /** Create the named file with write-progress reporter.
1109     * @deprecated Use 
1110     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1111     *   instead.
1112     */
1113    @Deprecated
1114    public Writer(FileSystem fs, Configuration conf, Path name, 
1115                  Class keyClass, Class valClass,
1116                  Progressable progress, Metadata metadata) throws IOException {
1117      this.compress = CompressionType.NONE;
1118      init(conf, fs.create(name, progress), true, keyClass, valClass,
1119           null, metadata);
1120    }
1121    
1122    /** Create the named file with write-progress reporter. 
1123     * @deprecated Use 
1124     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1125     *   instead.
1126     */
1127    @Deprecated
1128    public Writer(FileSystem fs, Configuration conf, Path name,
1129                  Class keyClass, Class valClass,
1130                  int bufferSize, short replication, long blockSize,
1131                  Progressable progress, Metadata metadata) throws IOException {
1132      this.compress = CompressionType.NONE;
1133      init(conf,
1134           fs.create(name, true, bufferSize, replication, blockSize, progress),
1135           true, keyClass, valClass, null, metadata);
1136    }
1137
1138    boolean isCompressed() { return compress != CompressionType.NONE; }
1139    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1140    
1141    Writer ownStream() { this.ownOutputStream = true; return this;  }
1142
1143    /** Write and flush the file header. */
1144    private void writeFileHeader() 
1145      throws IOException {
1146      out.write(VERSION);
1147      Text.writeString(out, keyClass.getName());
1148      Text.writeString(out, valClass.getName());
1149      
1150      out.writeBoolean(this.isCompressed());
1151      out.writeBoolean(this.isBlockCompressed());
1152      
1153      if (this.isCompressed()) {
1154        Text.writeString(out, (codec.getClass()).getName());
1155      }
1156      this.metadata.write(out);
1157      out.write(sync);                       // write the sync bytes
1158      out.flush();                           // flush header
1159    }
1160    
1161    /** Initialize. */
1162    @SuppressWarnings("unchecked")
1163    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1164              Class keyClass, Class valClass,
1165              CompressionCodec codec, Metadata metadata) 
1166      throws IOException {
1167      this.conf = conf;
1168      this.out = out;
1169      this.ownOutputStream = ownStream;
1170      this.keyClass = keyClass;
1171      this.valClass = valClass;
1172      this.codec = codec;
1173      this.metadata = metadata;
1174      SerializationFactory serializationFactory = new SerializationFactory(conf);
1175      this.keySerializer = serializationFactory.getSerializer(keyClass);
1176      if (this.keySerializer == null) {
1177        throw new IOException(
1178            "Could not find a serializer for the Key class: '"
1179                + keyClass.getCanonicalName() + "'. "
1180                + "Please ensure that the configuration '" +
1181                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1182                + "properly configured, if you're using"
1183                + "custom serialization.");
1184      }
1185      this.keySerializer.open(buffer);
1186      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1187      if (this.uncompressedValSerializer == null) {
1188        throw new IOException(
1189            "Could not find a serializer for the Value class: '"
1190                + valClass.getCanonicalName() + "'. "
1191                + "Please ensure that the configuration '" +
1192                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1193                + "properly configured, if you're using"
1194                + "custom serialization.");
1195      }
1196      this.uncompressedValSerializer.open(buffer);
1197      if (this.codec != null) {
1198        ReflectionUtils.setConf(this.codec, this.conf);
1199        this.compressor = CodecPool.getCompressor(this.codec);
1200        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1201        this.deflateOut = 
1202          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1203        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1204        if (this.compressedValSerializer == null) {
1205          throw new IOException(
1206              "Could not find a serializer for the Value class: '"
1207                  + valClass.getCanonicalName() + "'. "
1208                  + "Please ensure that the configuration '" +
1209                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1210                  + "properly configured, if you're using"
1211                  + "custom serialization.");
1212        }
1213        this.compressedValSerializer.open(deflateOut);
1214      }
1215      writeFileHeader();
1216    }
1217    
1218    /** Returns the class of keys in this file. */
1219    public Class getKeyClass() { return keyClass; }
1220
1221    /** Returns the class of values in this file. */
1222    public Class getValueClass() { return valClass; }
1223
1224    /** Returns the compression codec of data in this file. */
1225    public CompressionCodec getCompressionCodec() { return codec; }
1226    
1227    /** create a sync point */
1228    public void sync() throws IOException {
1229      if (sync != null && lastSyncPos != out.getPos()) {
1230        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1231        out.write(sync);                          // write sync
1232        lastSyncPos = out.getPos();               // update lastSyncPos
1233      }
1234    }
1235
1236    /**
1237     * flush all currently written data to the file system
1238     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1239     */
1240    @Deprecated
1241    public void syncFs() throws IOException {
1242      if (out != null) {
1243        out.sync();                               // flush contents to file system
1244      }
1245    }
1246
1247    @Override
1248    public void hsync() throws IOException {
1249      if (out != null) {
1250        out.hsync();
1251      }
1252    }
1253
1254    @Override
1255    public void hflush() throws IOException {
1256      if (out != null) {
1257        out.hflush();
1258      }
1259    }
1260    
1261    /** Returns the configuration of this file. */
1262    Configuration getConf() { return conf; }
1263    
1264    /** Close the file. */
1265    @Override
1266    public synchronized void close() throws IOException {
1267      keySerializer.close();
1268      uncompressedValSerializer.close();
1269      if (compressedValSerializer != null) {
1270        compressedValSerializer.close();
1271      }
1272
1273      CodecPool.returnCompressor(compressor);
1274      compressor = null;
1275      
1276      if (out != null) {
1277        
1278        // Close the underlying stream iff we own it...
1279        if (ownOutputStream) {
1280          out.close();
1281        } else {
1282          out.flush();
1283        }
1284        out = null;
1285      }
1286    }
1287
1288    synchronized void checkAndWriteSync() throws IOException {
1289      if (sync != null &&
1290          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1291        sync();
1292      }
1293    }
1294
1295    /** Append a key/value pair. */
1296    public void append(Writable key, Writable val)
1297      throws IOException {
1298      append((Object) key, (Object) val);
1299    }
1300
1301    /** Append a key/value pair. */
1302    @SuppressWarnings("unchecked")
1303    public synchronized void append(Object key, Object val)
1304      throws IOException {
1305      if (key.getClass() != keyClass)
1306        throw new IOException("wrong key class: "+key.getClass().getName()
1307                              +" is not "+keyClass);
1308      if (val.getClass() != valClass)
1309        throw new IOException("wrong value class: "+val.getClass().getName()
1310                              +" is not "+valClass);
1311
1312      buffer.reset();
1313
1314      // Append the 'key'
1315      keySerializer.serialize(key);
1316      int keyLength = buffer.getLength();
1317      if (keyLength < 0)
1318        throw new IOException("negative length keys not allowed: " + key);
1319
1320      // Append the 'value'
1321      if (compress == CompressionType.RECORD) {
1322        deflateFilter.resetState();
1323        compressedValSerializer.serialize(val);
1324        deflateOut.flush();
1325        deflateFilter.finish();
1326      } else {
1327        uncompressedValSerializer.serialize(val);
1328      }
1329
1330      // Write the record out
1331      checkAndWriteSync();                                // sync
1332      out.writeInt(buffer.getLength());                   // total record length
1333      out.writeInt(keyLength);                            // key portion length
1334      out.write(buffer.getData(), 0, buffer.getLength()); // data
1335    }
1336
1337    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1338        int keyLength, ValueBytes val) throws IOException {
1339      if (keyLength < 0)
1340        throw new IOException("negative length keys not allowed: " + keyLength);
1341
1342      int valLength = val.getSize();
1343
1344      checkAndWriteSync();
1345      
1346      out.writeInt(keyLength+valLength);          // total record length
1347      out.writeInt(keyLength);                    // key portion length
1348      out.write(keyData, keyOffset, keyLength);   // key
1349      val.writeUncompressedBytes(out);            // value
1350    }
1351
1352    /** Returns the current length of the output file.
1353     *
1354     * <p>This always returns a synchronized position.  In other words,
1355     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1356     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1357     * the key may be earlier in the file than key last written when this
1358     * method was called (e.g., with block-compression, it may be the first key
1359     * in the block that was being written when this method was called).
1360     */
1361    public synchronized long getLength() throws IOException {
1362      return out.getPos();
1363    }
1364
1365  } // class Writer
1366
1367  /** Write key/compressed-value pairs to a sequence-format file. */
1368  static class RecordCompressWriter extends Writer {
1369    
1370    RecordCompressWriter(Configuration conf, 
1371                         Option... options) throws IOException {
1372      super(conf, options);
1373    }
1374
1375    /** Append a key/value pair. */
1376    @Override
1377    @SuppressWarnings("unchecked")
1378    public synchronized void append(Object key, Object val)
1379      throws IOException {
1380      if (key.getClass() != keyClass)
1381        throw new IOException("wrong key class: "+key.getClass().getName()
1382                              +" is not "+keyClass);
1383      if (val.getClass() != valClass)
1384        throw new IOException("wrong value class: "+val.getClass().getName()
1385                              +" is not "+valClass);
1386
1387      buffer.reset();
1388
1389      // Append the 'key'
1390      keySerializer.serialize(key);
1391      int keyLength = buffer.getLength();
1392      if (keyLength < 0)
1393        throw new IOException("negative length keys not allowed: " + key);
1394
1395      // Compress 'value' and append it
1396      deflateFilter.resetState();
1397      compressedValSerializer.serialize(val);
1398      deflateOut.flush();
1399      deflateFilter.finish();
1400
1401      // Write the record out
1402      checkAndWriteSync();                                // sync
1403      out.writeInt(buffer.getLength());                   // total record length
1404      out.writeInt(keyLength);                            // key portion length
1405      out.write(buffer.getData(), 0, buffer.getLength()); // data
1406    }
1407
1408    /** Append a key/value pair. */
1409    @Override
1410    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1411        int keyLength, ValueBytes val) throws IOException {
1412
1413      if (keyLength < 0)
1414        throw new IOException("negative length keys not allowed: " + keyLength);
1415
1416      int valLength = val.getSize();
1417      
1418      checkAndWriteSync();                        // sync
1419      out.writeInt(keyLength+valLength);          // total record length
1420      out.writeInt(keyLength);                    // key portion length
1421      out.write(keyData, keyOffset, keyLength);   // 'key' data
1422      val.writeCompressedBytes(out);              // 'value' data
1423    }
1424    
1425  } // RecordCompressionWriter
1426
1427  /** Write compressed key/value blocks to a sequence-format file. */
1428  static class BlockCompressWriter extends Writer {
1429    
1430    private int noBufferedRecords = 0;
1431    
1432    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1433    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1434
1435    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1436    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1437
1438    private final int compressionBlockSize;
1439    
1440    BlockCompressWriter(Configuration conf,
1441                        Option... options) throws IOException {
1442      super(conf, options);
1443      compressionBlockSize = 
1444        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1445      keySerializer.close();
1446      keySerializer.open(keyBuffer);
1447      uncompressedValSerializer.close();
1448      uncompressedValSerializer.open(valBuffer);
1449    }
1450
1451    /** Workhorse to check and write out compressed data/lengths */
1452    private synchronized 
1453      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1454      throws IOException {
1455      deflateFilter.resetState();
1456      buffer.reset();
1457      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1458                       uncompressedDataBuffer.getLength());
1459      deflateOut.flush();
1460      deflateFilter.finish();
1461      
1462      WritableUtils.writeVInt(out, buffer.getLength());
1463      out.write(buffer.getData(), 0, buffer.getLength());
1464    }
1465    
1466    /** Compress and flush contents to dfs */
1467    @Override
1468    public synchronized void sync() throws IOException {
1469      if (noBufferedRecords > 0) {
1470        super.sync();
1471        
1472        // No. of records
1473        WritableUtils.writeVInt(out, noBufferedRecords);
1474        
1475        // Write 'keys' and lengths
1476        writeBuffer(keyLenBuffer);
1477        writeBuffer(keyBuffer);
1478        
1479        // Write 'values' and lengths
1480        writeBuffer(valLenBuffer);
1481        writeBuffer(valBuffer);
1482        
1483        // Flush the file-stream
1484        out.flush();
1485        
1486        // Reset internal states
1487        keyLenBuffer.reset();
1488        keyBuffer.reset();
1489        valLenBuffer.reset();
1490        valBuffer.reset();
1491        noBufferedRecords = 0;
1492      }
1493      
1494    }
1495    
1496    /** Close the file. */
1497    @Override
1498    public synchronized void close() throws IOException {
1499      if (out != null) {
1500        sync();
1501      }
1502      super.close();
1503    }
1504
1505    /** Append a key/value pair. */
1506    @Override
1507    @SuppressWarnings("unchecked")
1508    public synchronized void append(Object key, Object val)
1509      throws IOException {
1510      if (key.getClass() != keyClass)
1511        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1512      if (val.getClass() != valClass)
1513        throw new IOException("wrong value class: "+val+" is not "+valClass);
1514
1515      // Save key/value into respective buffers 
1516      int oldKeyLength = keyBuffer.getLength();
1517      keySerializer.serialize(key);
1518      int keyLength = keyBuffer.getLength() - oldKeyLength;
1519      if (keyLength < 0)
1520        throw new IOException("negative length keys not allowed: " + key);
1521      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1522
1523      int oldValLength = valBuffer.getLength();
1524      uncompressedValSerializer.serialize(val);
1525      int valLength = valBuffer.getLength() - oldValLength;
1526      WritableUtils.writeVInt(valLenBuffer, valLength);
1527      
1528      // Added another key/value pair
1529      ++noBufferedRecords;
1530      
1531      // Compress and flush?
1532      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1533      if (currentBlockSize >= compressionBlockSize) {
1534        sync();
1535      }
1536    }
1537    
1538    /** Append a key/value pair. */
1539    @Override
1540    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1541        int keyLength, ValueBytes val) throws IOException {
1542      
1543      if (keyLength < 0)
1544        throw new IOException("negative length keys not allowed");
1545
1546      int valLength = val.getSize();
1547      
1548      // Save key/value data in relevant buffers
1549      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1550      keyBuffer.write(keyData, keyOffset, keyLength);
1551      WritableUtils.writeVInt(valLenBuffer, valLength);
1552      val.writeUncompressedBytes(valBuffer);
1553
1554      // Added another key/value pair
1555      ++noBufferedRecords;
1556
1557      // Compress and flush?
1558      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1559      if (currentBlockSize >= compressionBlockSize) {
1560        sync();
1561      }
1562    }
1563  
1564  } // BlockCompressionWriter
1565
1566  /** Get the configured buffer size */
1567  private static int getBufferSize(Configuration conf) {
1568    return conf.getInt("io.file.buffer.size", 4096);
1569  }
1570
1571  /** Reads key/value pairs from a sequence-format file. */
1572  public static class Reader implements java.io.Closeable {
1573    private String filename;
1574    private FSDataInputStream in;
1575    private DataOutputBuffer outBuf = new DataOutputBuffer();
1576
1577    private byte version;
1578
1579    private String keyClassName;
1580    private String valClassName;
1581    private Class keyClass;
1582    private Class valClass;
1583
1584    private CompressionCodec codec = null;
1585    private Metadata metadata = null;
1586    
1587    private byte[] sync = new byte[SYNC_HASH_SIZE];
1588    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1589    private boolean syncSeen;
1590
1591    private long headerEnd;
1592    private long end;
1593    private int keyLength;
1594    private int recordLength;
1595
1596    private boolean decompress;
1597    private boolean blockCompressed;
1598    
1599    private Configuration conf;
1600
1601    private int noBufferedRecords = 0;
1602    private boolean lazyDecompress = true;
1603    private boolean valuesDecompressed = true;
1604    
1605    private int noBufferedKeys = 0;
1606    private int noBufferedValues = 0;
1607    
1608    private DataInputBuffer keyLenBuffer = null;
1609    private CompressionInputStream keyLenInFilter = null;
1610    private DataInputStream keyLenIn = null;
1611    private Decompressor keyLenDecompressor = null;
1612    private DataInputBuffer keyBuffer = null;
1613    private CompressionInputStream keyInFilter = null;
1614    private DataInputStream keyIn = null;
1615    private Decompressor keyDecompressor = null;
1616
1617    private DataInputBuffer valLenBuffer = null;
1618    private CompressionInputStream valLenInFilter = null;
1619    private DataInputStream valLenIn = null;
1620    private Decompressor valLenDecompressor = null;
1621    private DataInputBuffer valBuffer = null;
1622    private CompressionInputStream valInFilter = null;
1623    private DataInputStream valIn = null;
1624    private Decompressor valDecompressor = null;
1625    
1626    private Deserializer keyDeserializer;
1627    private Deserializer valDeserializer;
1628
1629    /**
1630     * A tag interface for all of the Reader options
1631     */
1632    public static interface Option {}
1633    
1634    /**
1635     * Create an option to specify the path name of the sequence file.
1636     * @param value the path to read
1637     * @return a new option
1638     */
1639    public static Option file(Path value) {
1640      return new FileOption(value);
1641    }
1642    
1643    /**
1644     * Create an option to specify the stream with the sequence file.
1645     * @param value the stream to read.
1646     * @return a new option
1647     */
1648    public static Option stream(FSDataInputStream value) {
1649      return new InputStreamOption(value);
1650    }
1651    
1652    /**
1653     * Create an option to specify the starting byte to read.
1654     * @param value the number of bytes to skip over
1655     * @return a new option
1656     */
1657    public static Option start(long value) {
1658      return new StartOption(value);
1659    }
1660    
1661    /**
1662     * Create an option to specify the number of bytes to read.
1663     * @param value the number of bytes to read
1664     * @return a new option
1665     */
1666    public static Option length(long value) {
1667      return new LengthOption(value);
1668    }
1669    
1670    /**
1671     * Create an option with the buffer size for reading the given pathname.
1672     * @param value the number of bytes to buffer
1673     * @return a new option
1674     */
1675    public static Option bufferSize(int value) {
1676      return new BufferSizeOption(value);
1677    }
1678
1679    private static class FileOption extends Options.PathOption 
1680                                    implements Option {
1681      private FileOption(Path value) {
1682        super(value);
1683      }
1684    }
1685    
1686    private static class InputStreamOption
1687        extends Options.FSDataInputStreamOption 
1688        implements Option {
1689      private InputStreamOption(FSDataInputStream value) {
1690        super(value);
1691      }
1692    }
1693
1694    private static class StartOption extends Options.LongOption
1695                                     implements Option {
1696      private StartOption(long value) {
1697        super(value);
1698      }
1699    }
1700
1701    private static class LengthOption extends Options.LongOption
1702                                      implements Option {
1703      private LengthOption(long value) {
1704        super(value);
1705      }
1706    }
1707
1708    private static class BufferSizeOption extends Options.IntegerOption
1709                                      implements Option {
1710      private BufferSizeOption(int value) {
1711        super(value);
1712      }
1713    }
1714
1715    // only used directly
1716    private static class OnlyHeaderOption extends Options.BooleanOption 
1717                                          implements Option {
1718      private OnlyHeaderOption() {
1719        super(true);
1720      }
1721    }
1722
1723    public Reader(Configuration conf, Option... opts) throws IOException {
1724      // Look up the options, these are null if not set
1725      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1726      InputStreamOption streamOpt = 
1727        Options.getOption(InputStreamOption.class, opts);
1728      StartOption startOpt = Options.getOption(StartOption.class, opts);
1729      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1730      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1731      OnlyHeaderOption headerOnly = 
1732        Options.getOption(OnlyHeaderOption.class, opts);
1733      // check for consistency
1734      if ((fileOpt == null) == (streamOpt == null)) {
1735        throw new 
1736          IllegalArgumentException("File or stream option must be specified");
1737      }
1738      if (fileOpt == null && bufOpt != null) {
1739        throw new IllegalArgumentException("buffer size can only be set when" +
1740                                           " a file is specified.");
1741      }
1742      // figure out the real values
1743      Path filename = null;
1744      FSDataInputStream file;
1745      final long len;
1746      if (fileOpt != null) {
1747        filename = fileOpt.getValue();
1748        FileSystem fs = filename.getFileSystem(conf);
1749        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1750        len = null == lenOpt
1751          ? fs.getFileStatus(filename).getLen()
1752          : lenOpt.getValue();
1753        file = openFile(fs, filename, bufSize, len);
1754      } else {
1755        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1756        file = streamOpt.getValue();
1757      }
1758      long start = startOpt == null ? 0 : startOpt.getValue();
1759      // really set up
1760      initialize(filename, file, start, len, conf, headerOnly != null);
1761    }
1762
1763    /**
1764     * Construct a reader by opening a file from the given file system.
1765     * @param fs The file system used to open the file.
1766     * @param file The file being read.
1767     * @param conf Configuration
1768     * @throws IOException
1769     * @deprecated Use Reader(Configuration, Option...) instead.
1770     */
1771    @Deprecated
1772    public Reader(FileSystem fs, Path file, 
1773                  Configuration conf) throws IOException {
1774      this(conf, file(file.makeQualified(fs)));
1775    }
1776
1777    /**
1778     * Construct a reader by the given input stream.
1779     * @param in An input stream.
1780     * @param buffersize unused
1781     * @param start The starting position.
1782     * @param length The length being read.
1783     * @param conf Configuration
1784     * @throws IOException
1785     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1786     */
1787    @Deprecated
1788    public Reader(FSDataInputStream in, int buffersize,
1789        long start, long length, Configuration conf) throws IOException {
1790      this(conf, stream(in), start(start), length(length));
1791    }
1792
1793    /** Common work of the constructors. */
1794    private void initialize(Path filename, FSDataInputStream in,
1795                            long start, long length, Configuration conf,
1796                            boolean tempReader) throws IOException {
1797      if (in == null) {
1798        throw new IllegalArgumentException("in == null");
1799      }
1800      this.filename = filename == null ? "<unknown>" : filename.toString();
1801      this.in = in;
1802      this.conf = conf;
1803      boolean succeeded = false;
1804      try {
1805        seek(start);
1806        this.end = this.in.getPos() + length;
1807        // if it wrapped around, use the max
1808        if (end < length) {
1809          end = Long.MAX_VALUE;
1810        }
1811        init(tempReader);
1812        succeeded = true;
1813      } finally {
1814        if (!succeeded) {
1815          IOUtils.cleanup(LOG, this.in);
1816        }
1817      }
1818    }
1819
1820    /**
1821     * Override this method to specialize the type of
1822     * {@link FSDataInputStream} returned.
1823     * @param fs The file system used to open the file.
1824     * @param file The file being read.
1825     * @param bufferSize The buffer size used to read the file.
1826     * @param length The length being read if it is >= 0.  Otherwise,
1827     *               the length is not available.
1828     * @return The opened stream.
1829     * @throws IOException
1830     */
1831    protected FSDataInputStream openFile(FileSystem fs, Path file,
1832        int bufferSize, long length) throws IOException {
1833      return fs.open(file, bufferSize);
1834    }
1835    
1836    /**
1837     * Initialize the {@link Reader}
1838     * @param tmpReader <code>true</code> if we are constructing a temporary
1839     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1840     *                  and hence do not initialize every component; 
1841     *                  <code>false</code> otherwise.
1842     * @throws IOException
1843     */
1844    private void init(boolean tempReader) throws IOException {
1845      byte[] versionBlock = new byte[VERSION.length];
1846      in.readFully(versionBlock);
1847
1848      if ((versionBlock[0] != VERSION[0]) ||
1849          (versionBlock[1] != VERSION[1]) ||
1850          (versionBlock[2] != VERSION[2]))
1851        throw new IOException(this + " not a SequenceFile");
1852
1853      // Set 'version'
1854      version = versionBlock[3];
1855      if (version > VERSION[3])
1856        throw new VersionMismatchException(VERSION[3], version);
1857
1858      if (version < BLOCK_COMPRESS_VERSION) {
1859        UTF8 className = new UTF8();
1860
1861        className.readFields(in);
1862        keyClassName = className.toStringChecked(); // key class name
1863
1864        className.readFields(in);
1865        valClassName = className.toStringChecked(); // val class name
1866      } else {
1867        keyClassName = Text.readString(in);
1868        valClassName = Text.readString(in);
1869      }
1870
1871      if (version > 2) {                          // if version > 2
1872        this.decompress = in.readBoolean();       // is compressed?
1873      } else {
1874        decompress = false;
1875      }
1876
1877      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1878        this.blockCompressed = in.readBoolean();  // is block-compressed?
1879      } else {
1880        blockCompressed = false;
1881      }
1882      
1883      // if version >= 5
1884      // setup the compression codec
1885      if (decompress) {
1886        if (version >= CUSTOM_COMPRESS_VERSION) {
1887          String codecClassname = Text.readString(in);
1888          try {
1889            Class<? extends CompressionCodec> codecClass
1890              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1891            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1892          } catch (ClassNotFoundException cnfe) {
1893            throw new IllegalArgumentException("Unknown codec: " + 
1894                                               codecClassname, cnfe);
1895          }
1896        } else {
1897          codec = new DefaultCodec();
1898          ((Configurable)codec).setConf(conf);
1899        }
1900      }
1901      
1902      this.metadata = new Metadata();
1903      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1904        this.metadata.readFields(in);
1905      }
1906      
1907      if (version > 1) {                          // if version > 1
1908        in.readFully(sync);                       // read sync bytes
1909        headerEnd = in.getPos();                  // record end of header
1910      }
1911      
1912      // Initialize... *not* if this we are constructing a temporary Reader
1913      if (!tempReader) {
1914        valBuffer = new DataInputBuffer();
1915        if (decompress) {
1916          valDecompressor = CodecPool.getDecompressor(codec);
1917          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1918          valIn = new DataInputStream(valInFilter);
1919        } else {
1920          valIn = valBuffer;
1921        }
1922
1923        if (blockCompressed) {
1924          keyLenBuffer = new DataInputBuffer();
1925          keyBuffer = new DataInputBuffer();
1926          valLenBuffer = new DataInputBuffer();
1927
1928          keyLenDecompressor = CodecPool.getDecompressor(codec);
1929          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1930                                                   keyLenDecompressor);
1931          keyLenIn = new DataInputStream(keyLenInFilter);
1932
1933          keyDecompressor = CodecPool.getDecompressor(codec);
1934          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1935          keyIn = new DataInputStream(keyInFilter);
1936
1937          valLenDecompressor = CodecPool.getDecompressor(codec);
1938          valLenInFilter = codec.createInputStream(valLenBuffer, 
1939                                                   valLenDecompressor);
1940          valLenIn = new DataInputStream(valLenInFilter);
1941        }
1942        
1943        SerializationFactory serializationFactory =
1944          new SerializationFactory(conf);
1945        this.keyDeserializer =
1946          getDeserializer(serializationFactory, getKeyClass());
1947        if (this.keyDeserializer == null) {
1948          throw new IOException(
1949              "Could not find a deserializer for the Key class: '"
1950                  + getKeyClass().getCanonicalName() + "'. "
1951                  + "Please ensure that the configuration '" +
1952                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1953                  + "properly configured, if you're using "
1954                  + "custom serialization.");
1955        }
1956        if (!blockCompressed) {
1957          this.keyDeserializer.open(valBuffer);
1958        } else {
1959          this.keyDeserializer.open(keyIn);
1960        }
1961        this.valDeserializer =
1962          getDeserializer(serializationFactory, getValueClass());
1963        if (this.valDeserializer == null) {
1964          throw new IOException(
1965              "Could not find a deserializer for the Value class: '"
1966                  + getValueClass().getCanonicalName() + "'. "
1967                  + "Please ensure that the configuration '" +
1968                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1969                  + "properly configured, if you're using "
1970                  + "custom serialization.");
1971        }
1972        this.valDeserializer.open(valIn);
1973      }
1974    }
1975    
1976    @SuppressWarnings("unchecked")
1977    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1978      return sf.getDeserializer(c);
1979    }
1980    
1981    /** Close the file. */
1982    @Override
1983    public synchronized void close() throws IOException {
1984      // Return the decompressors to the pool
1985      CodecPool.returnDecompressor(keyLenDecompressor);
1986      CodecPool.returnDecompressor(keyDecompressor);
1987      CodecPool.returnDecompressor(valLenDecompressor);
1988      CodecPool.returnDecompressor(valDecompressor);
1989      keyLenDecompressor = keyDecompressor = null;
1990      valLenDecompressor = valDecompressor = null;
1991      
1992      if (keyDeserializer != null) {
1993        keyDeserializer.close();
1994      }
1995      if (valDeserializer != null) {
1996        valDeserializer.close();
1997      }
1998      
1999      // Close the input-stream
2000      in.close();
2001    }
2002
2003    /** Returns the name of the key class. */
2004    public String getKeyClassName() {
2005      return keyClassName;
2006    }
2007
2008    /** Returns the class of keys in this file. */
2009    public synchronized Class<?> getKeyClass() {
2010      if (null == keyClass) {
2011        try {
2012          keyClass = WritableName.getClass(getKeyClassName(), conf);
2013        } catch (IOException e) {
2014          throw new RuntimeException(e);
2015        }
2016      }
2017      return keyClass;
2018    }
2019
2020    /** Returns the name of the value class. */
2021    public String getValueClassName() {
2022      return valClassName;
2023    }
2024
2025    /** Returns the class of values in this file. */
2026    public synchronized Class<?> getValueClass() {
2027      if (null == valClass) {
2028        try {
2029          valClass = WritableName.getClass(getValueClassName(), conf);
2030        } catch (IOException e) {
2031          throw new RuntimeException(e);
2032        }
2033      }
2034      return valClass;
2035    }
2036
2037    /** Returns true if values are compressed. */
2038    public boolean isCompressed() { return decompress; }
2039    
2040    /** Returns true if records are block-compressed. */
2041    public boolean isBlockCompressed() { return blockCompressed; }
2042    
2043    /** Returns the compression codec of data in this file. */
2044    public CompressionCodec getCompressionCodec() { return codec; }
2045    
2046    /**
2047     * Get the compression type for this file.
2048     * @return the compression type
2049     */
2050    public CompressionType getCompressionType() {
2051      if (decompress) {
2052        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2053      } else {
2054        return CompressionType.NONE;
2055      }
2056    }
2057
2058    /** Returns the metadata object of the file */
2059    public Metadata getMetadata() {
2060      return this.metadata;
2061    }
2062    
2063    /** Returns the configuration used for this file. */
2064    Configuration getConf() { return conf; }
2065    
2066    /** Read a compressed buffer */
2067    private synchronized void readBuffer(DataInputBuffer buffer, 
2068                                         CompressionInputStream filter) throws IOException {
2069      // Read data into a temporary buffer
2070      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2071
2072      try {
2073        int dataBufferLength = WritableUtils.readVInt(in);
2074        dataBuffer.write(in, dataBufferLength);
2075      
2076        // Set up 'buffer' connected to the input-stream
2077        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2078      } finally {
2079        dataBuffer.close();
2080      }
2081
2082      // Reset the codec
2083      filter.resetState();
2084    }
2085    
2086    /** Read the next 'compressed' block */
2087    private synchronized void readBlock() throws IOException {
2088      // Check if we need to throw away a whole block of 
2089      // 'values' due to 'lazy decompression' 
2090      if (lazyDecompress && !valuesDecompressed) {
2091        in.seek(WritableUtils.readVInt(in)+in.getPos());
2092        in.seek(WritableUtils.readVInt(in)+in.getPos());
2093      }
2094      
2095      // Reset internal states
2096      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2097      valuesDecompressed = false;
2098
2099      //Process sync
2100      if (sync != null) {
2101        in.readInt();
2102        in.readFully(syncCheck);                // read syncCheck
2103        if (!Arrays.equals(sync, syncCheck))    // check it
2104          throw new IOException("File is corrupt!");
2105      }
2106      syncSeen = true;
2107
2108      // Read number of records in this block
2109      noBufferedRecords = WritableUtils.readVInt(in);
2110      
2111      // Read key lengths and keys
2112      readBuffer(keyLenBuffer, keyLenInFilter);
2113      readBuffer(keyBuffer, keyInFilter);
2114      noBufferedKeys = noBufferedRecords;
2115      
2116      // Read value lengths and values
2117      if (!lazyDecompress) {
2118        readBuffer(valLenBuffer, valLenInFilter);
2119        readBuffer(valBuffer, valInFilter);
2120        noBufferedValues = noBufferedRecords;
2121        valuesDecompressed = true;
2122      }
2123    }
2124
2125    /** 
2126     * Position valLenIn/valIn to the 'value' 
2127     * corresponding to the 'current' key 
2128     */
2129    private synchronized void seekToCurrentValue() throws IOException {
2130      if (!blockCompressed) {
2131        if (decompress) {
2132          valInFilter.resetState();
2133        }
2134        valBuffer.reset();
2135      } else {
2136        // Check if this is the first value in the 'block' to be read
2137        if (lazyDecompress && !valuesDecompressed) {
2138          // Read the value lengths and values
2139          readBuffer(valLenBuffer, valLenInFilter);
2140          readBuffer(valBuffer, valInFilter);
2141          noBufferedValues = noBufferedRecords;
2142          valuesDecompressed = true;
2143        }
2144        
2145        // Calculate the no. of bytes to skip
2146        // Note: 'current' key has already been read!
2147        int skipValBytes = 0;
2148        int currentKey = noBufferedKeys + 1;          
2149        for (int i=noBufferedValues; i > currentKey; --i) {
2150          skipValBytes += WritableUtils.readVInt(valLenIn);
2151          --noBufferedValues;
2152        }
2153        
2154        // Skip to the 'val' corresponding to 'current' key
2155        if (skipValBytes > 0) {
2156          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2157            throw new IOException("Failed to seek to " + currentKey + 
2158                                  "(th) value!");
2159          }
2160        }
2161      }
2162    }
2163
2164    /**
2165     * Get the 'value' corresponding to the last read 'key'.
2166     * @param val : The 'value' to be read.
2167     * @throws IOException
2168     */
2169    public synchronized void getCurrentValue(Writable val) 
2170      throws IOException {
2171      if (val instanceof Configurable) {
2172        ((Configurable) val).setConf(this.conf);
2173      }
2174
2175      // Position stream to 'current' value
2176      seekToCurrentValue();
2177
2178      if (!blockCompressed) {
2179        val.readFields(valIn);
2180        
2181        if (valIn.read() > 0) {
2182          LOG.info("available bytes: " + valIn.available());
2183          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2184                                + " bytes, should read " +
2185                                (valBuffer.getLength()-keyLength));
2186        }
2187      } else {
2188        // Get the value
2189        int valLength = WritableUtils.readVInt(valLenIn);
2190        val.readFields(valIn);
2191        
2192        // Read another compressed 'value'
2193        --noBufferedValues;
2194        
2195        // Sanity check
2196        if ((valLength < 0) && LOG.isDebugEnabled()) {
2197          LOG.debug(val + " is a zero-length value");
2198        }
2199      }
2200
2201    }
2202    
2203    /**
2204     * Get the 'value' corresponding to the last read 'key'.
2205     * @param val : The 'value' to be read.
2206     * @throws IOException
2207     */
2208    public synchronized Object getCurrentValue(Object val) 
2209      throws IOException {
2210      if (val instanceof Configurable) {
2211        ((Configurable) val).setConf(this.conf);
2212      }
2213
2214      // Position stream to 'current' value
2215      seekToCurrentValue();
2216
2217      if (!blockCompressed) {
2218        val = deserializeValue(val);
2219        
2220        if (valIn.read() > 0) {
2221          LOG.info("available bytes: " + valIn.available());
2222          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2223                                + " bytes, should read " +
2224                                (valBuffer.getLength()-keyLength));
2225        }
2226      } else {
2227        // Get the value
2228        int valLength = WritableUtils.readVInt(valLenIn);
2229        val = deserializeValue(val);
2230        
2231        // Read another compressed 'value'
2232        --noBufferedValues;
2233        
2234        // Sanity check
2235        if ((valLength < 0) && LOG.isDebugEnabled()) {
2236          LOG.debug(val + " is a zero-length value");
2237        }
2238      }
2239      return val;
2240
2241    }
2242
2243    @SuppressWarnings("unchecked")
2244    private Object deserializeValue(Object val) throws IOException {
2245      return valDeserializer.deserialize(val);
2246    }
2247    
2248    /** Read the next key in the file into <code>key</code>, skipping its
2249     * value.  True if another entry exists, and false at end of file. */
2250    public synchronized boolean next(Writable key) throws IOException {
2251      if (key.getClass() != getKeyClass())
2252        throw new IOException("wrong key class: "+key.getClass().getName()
2253                              +" is not "+keyClass);
2254
2255      if (!blockCompressed) {
2256        outBuf.reset();
2257        
2258        keyLength = next(outBuf);
2259        if (keyLength < 0)
2260          return false;
2261        
2262        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2263        
2264        key.readFields(valBuffer);
2265        valBuffer.mark(0);
2266        if (valBuffer.getPosition() != keyLength)
2267          throw new IOException(key + " read " + valBuffer.getPosition()
2268                                + " bytes, should read " + keyLength);
2269      } else {
2270        //Reset syncSeen
2271        syncSeen = false;
2272        
2273        if (noBufferedKeys == 0) {
2274          try {
2275            readBlock();
2276          } catch (EOFException eof) {
2277            return false;
2278          }
2279        }
2280        
2281        int keyLength = WritableUtils.readVInt(keyLenIn);
2282        
2283        // Sanity check
2284        if (keyLength < 0) {
2285          return false;
2286        }
2287        
2288        //Read another compressed 'key'
2289        key.readFields(keyIn);
2290        --noBufferedKeys;
2291      }
2292
2293      return true;
2294    }
2295
2296    /** Read the next key/value pair in the file into <code>key</code> and
2297     * <code>val</code>.  Returns true if such a pair exists and false when at
2298     * end of file */
2299    public synchronized boolean next(Writable key, Writable val)
2300      throws IOException {
2301      if (val.getClass() != getValueClass())
2302        throw new IOException("wrong value class: "+val+" is not "+valClass);
2303
2304      boolean more = next(key);
2305      
2306      if (more) {
2307        getCurrentValue(val);
2308      }
2309
2310      return more;
2311    }
2312    
2313    /**
2314     * Read and return the next record length, potentially skipping over 
2315     * a sync block.
2316     * @return the length of the next record or -1 if there is no next record
2317     * @throws IOException
2318     */
2319    private synchronized int readRecordLength() throws IOException {
2320      if (in.getPos() >= end) {
2321        return -1;
2322      }      
2323      int length = in.readInt();
2324      if (version > 1 && sync != null &&
2325          length == SYNC_ESCAPE) {              // process a sync entry
2326        in.readFully(syncCheck);                // read syncCheck
2327        if (!Arrays.equals(sync, syncCheck))    // check it
2328          throw new IOException("File is corrupt!");
2329        syncSeen = true;
2330        if (in.getPos() >= end) {
2331          return -1;
2332        }
2333        length = in.readInt();                  // re-read length
2334      } else {
2335        syncSeen = false;
2336      }
2337      
2338      return length;
2339    }
2340    
2341    /** Read the next key/value pair in the file into <code>buffer</code>.
2342     * Returns the length of the key read, or -1 if at end of file.  The length
2343     * of the value may be computed by calling buffer.getLength() before and
2344     * after calls to this method. */
2345    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2346    @Deprecated
2347    synchronized int next(DataOutputBuffer buffer) throws IOException {
2348      // Unsupported for block-compressed sequence files
2349      if (blockCompressed) {
2350        throw new IOException("Unsupported call for block-compressed" +
2351                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2352      }
2353      try {
2354        int length = readRecordLength();
2355        if (length == -1) {
2356          return -1;
2357        }
2358        int keyLength = in.readInt();
2359        buffer.write(in, length);
2360        return keyLength;
2361      } catch (ChecksumException e) {             // checksum failure
2362        handleChecksumException(e);
2363        return next(buffer);
2364      }
2365    }
2366
2367    public ValueBytes createValueBytes() {
2368      ValueBytes val = null;
2369      if (!decompress || blockCompressed) {
2370        val = new UncompressedBytes();
2371      } else {
2372        val = new CompressedBytes(codec);
2373      }
2374      return val;
2375    }
2376
2377    /**
2378     * Read 'raw' records.
2379     * @param key - The buffer into which the key is read
2380     * @param val - The 'raw' value
2381     * @return Returns the total record length or -1 for end of file
2382     * @throws IOException
2383     */
2384    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2385      throws IOException {
2386      if (!blockCompressed) {
2387        int length = readRecordLength();
2388        if (length == -1) {
2389          return -1;
2390        }
2391        int keyLength = in.readInt();
2392        int valLength = length - keyLength;
2393        key.write(in, keyLength);
2394        if (decompress) {
2395          CompressedBytes value = (CompressedBytes)val;
2396          value.reset(in, valLength);
2397        } else {
2398          UncompressedBytes value = (UncompressedBytes)val;
2399          value.reset(in, valLength);
2400        }
2401        
2402        return length;
2403      } else {
2404        //Reset syncSeen
2405        syncSeen = false;
2406        
2407        // Read 'key'
2408        if (noBufferedKeys == 0) {
2409          if (in.getPos() >= end) 
2410            return -1;
2411
2412          try { 
2413            readBlock();
2414          } catch (EOFException eof) {
2415            return -1;
2416          }
2417        }
2418        int keyLength = WritableUtils.readVInt(keyLenIn);
2419        if (keyLength < 0) {
2420          throw new IOException("zero length key found!");
2421        }
2422        key.write(keyIn, keyLength);
2423        --noBufferedKeys;
2424        
2425        // Read raw 'value'
2426        seekToCurrentValue();
2427        int valLength = WritableUtils.readVInt(valLenIn);
2428        UncompressedBytes rawValue = (UncompressedBytes)val;
2429        rawValue.reset(valIn, valLength);
2430        --noBufferedValues;
2431        
2432        return (keyLength+valLength);
2433      }
2434      
2435    }
2436
2437    /**
2438     * Read 'raw' keys.
2439     * @param key - The buffer into which the key is read
2440     * @return Returns the key length or -1 for end of file
2441     * @throws IOException
2442     */
2443    public synchronized int nextRawKey(DataOutputBuffer key) 
2444      throws IOException {
2445      if (!blockCompressed) {
2446        recordLength = readRecordLength();
2447        if (recordLength == -1) {
2448          return -1;
2449        }
2450        keyLength = in.readInt();
2451        key.write(in, keyLength);
2452        return keyLength;
2453      } else {
2454        //Reset syncSeen
2455        syncSeen = false;
2456        
2457        // Read 'key'
2458        if (noBufferedKeys == 0) {
2459          if (in.getPos() >= end) 
2460            return -1;
2461
2462          try { 
2463            readBlock();
2464          } catch (EOFException eof) {
2465            return -1;
2466          }
2467        }
2468        int keyLength = WritableUtils.readVInt(keyLenIn);
2469        if (keyLength < 0) {
2470          throw new IOException("zero length key found!");
2471        }
2472        key.write(keyIn, keyLength);
2473        --noBufferedKeys;
2474        
2475        return keyLength;
2476      }
2477      
2478    }
2479
2480    /** Read the next key in the file, skipping its
2481     * value.  Return null at end of file. */
2482    public synchronized Object next(Object key) throws IOException {
2483      if (key != null && key.getClass() != getKeyClass()) {
2484        throw new IOException("wrong key class: "+key.getClass().getName()
2485                              +" is not "+keyClass);
2486      }
2487
2488      if (!blockCompressed) {
2489        outBuf.reset();
2490        
2491        keyLength = next(outBuf);
2492        if (keyLength < 0)
2493          return null;
2494        
2495        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2496        
2497        key = deserializeKey(key);
2498        valBuffer.mark(0);
2499        if (valBuffer.getPosition() != keyLength)
2500          throw new IOException(key + " read " + valBuffer.getPosition()
2501                                + " bytes, should read " + keyLength);
2502      } else {
2503        //Reset syncSeen
2504        syncSeen = false;
2505        
2506        if (noBufferedKeys == 0) {
2507          try {
2508            readBlock();
2509          } catch (EOFException eof) {
2510            return null;
2511          }
2512        }
2513        
2514        int keyLength = WritableUtils.readVInt(keyLenIn);
2515        
2516        // Sanity check
2517        if (keyLength < 0) {
2518          return null;
2519        }
2520        
2521        //Read another compressed 'key'
2522        key = deserializeKey(key);
2523        --noBufferedKeys;
2524      }
2525
2526      return key;
2527    }
2528
2529    @SuppressWarnings("unchecked")
2530    private Object deserializeKey(Object key) throws IOException {
2531      return keyDeserializer.deserialize(key);
2532    }
2533
2534    /**
2535     * Read 'raw' values.
2536     * @param val - The 'raw' value
2537     * @return Returns the value length
2538     * @throws IOException
2539     */
2540    public synchronized int nextRawValue(ValueBytes val) 
2541      throws IOException {
2542      
2543      // Position stream to current value
2544      seekToCurrentValue();
2545 
2546      if (!blockCompressed) {
2547        int valLength = recordLength - keyLength;
2548        if (decompress) {
2549          CompressedBytes value = (CompressedBytes)val;
2550          value.reset(in, valLength);
2551        } else {
2552          UncompressedBytes value = (UncompressedBytes)val;
2553          value.reset(in, valLength);
2554        }
2555         
2556        return valLength;
2557      } else {
2558        int valLength = WritableUtils.readVInt(valLenIn);
2559        UncompressedBytes rawValue = (UncompressedBytes)val;
2560        rawValue.reset(valIn, valLength);
2561        --noBufferedValues;
2562        return valLength;
2563      }
2564      
2565    }
2566
2567    private void handleChecksumException(ChecksumException e)
2568      throws IOException {
2569      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2570        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2571        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2572      } else {
2573        throw e;
2574      }
2575    }
2576
2577    /** disables sync. often invoked for tmp files */
2578    synchronized void ignoreSync() {
2579      sync = null;
2580    }
2581    
2582    /** Set the current byte position in the input file.
2583     *
2584     * <p>The position passed must be a position returned by {@link
2585     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2586     * position, use {@link SequenceFile.Reader#sync(long)}.
2587     */
2588    public synchronized void seek(long position) throws IOException {
2589      in.seek(position);
2590      if (blockCompressed) {                      // trigger block read
2591        noBufferedKeys = 0;
2592        valuesDecompressed = true;
2593      }
2594    }
2595
2596    /** Seek to the next sync mark past a given position.*/
2597    public synchronized void sync(long position) throws IOException {
2598      if (position+SYNC_SIZE >= end) {
2599        seek(end);
2600        return;
2601      }
2602
2603      if (position < headerEnd) {
2604        // seek directly to first record
2605        in.seek(headerEnd);
2606        // note the sync marker "seen" in the header
2607        syncSeen = true;
2608        return;
2609      }
2610
2611      try {
2612        seek(position+4);                         // skip escape
2613        in.readFully(syncCheck);
2614        int syncLen = sync.length;
2615        for (int i = 0; in.getPos() < end; i++) {
2616          int j = 0;
2617          for (; j < syncLen; j++) {
2618            if (sync[j] != syncCheck[(i+j)%syncLen])
2619              break;
2620          }
2621          if (j == syncLen) {
2622            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2623            return;
2624          }
2625          syncCheck[i%syncLen] = in.readByte();
2626        }
2627      } catch (ChecksumException e) {             // checksum failure
2628        handleChecksumException(e);
2629      }
2630    }
2631
2632    /** Returns true iff the previous call to next passed a sync mark.*/
2633    public synchronized boolean syncSeen() { return syncSeen; }
2634
2635    /** Return the current byte position in the input file. */
2636    public synchronized long getPosition() throws IOException {
2637      return in.getPos();
2638    }
2639
2640    /** Returns the name of the file. */
2641    @Override
2642    public String toString() {
2643      return filename;
2644    }
2645
2646  }
2647
2648  /** Sorts key/value pairs in a sequence-format file.
2649   *
2650   * <p>For best performance, applications should make sure that the {@link
2651   * Writable#readFields(DataInput)} implementation of their keys is
2652   * very efficient.  In particular, it should avoid allocating memory.
2653   */
2654  public static class Sorter {
2655
2656    private RawComparator comparator;
2657
2658    private MergeSort mergeSort; //the implementation of merge sort
2659    
2660    private Path[] inFiles;                     // when merging or sorting
2661
2662    private Path outFile;
2663
2664    private int memory; // bytes
2665    private int factor; // merged per pass
2666
2667    private FileSystem fs = null;
2668
2669    private Class keyClass;
2670    private Class valClass;
2671
2672    private Configuration conf;
2673    private Metadata metadata;
2674    
2675    private Progressable progressable = null;
2676
2677    /** Sort and merge files containing the named classes. */
2678    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2679                  Class valClass, Configuration conf)  {
2680      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2681    }
2682
2683    /** Sort and merge using an arbitrary {@link RawComparator}. */
2684    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2685                  Class valClass, Configuration conf) {
2686      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2687    }
2688
2689    /** Sort and merge using an arbitrary {@link RawComparator}. */
2690    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2691                  Class valClass, Configuration conf, Metadata metadata) {
2692      this.fs = fs;
2693      this.comparator = comparator;
2694      this.keyClass = keyClass;
2695      this.valClass = valClass;
2696      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2697      this.factor = conf.getInt("io.sort.factor", 100);
2698      this.conf = conf;
2699      this.metadata = metadata;
2700    }
2701
2702    /** Set the number of streams to merge at once.*/
2703    public void setFactor(int factor) { this.factor = factor; }
2704
2705    /** Get the number of streams to merge at once.*/
2706    public int getFactor() { return factor; }
2707
2708    /** Set the total amount of buffer memory, in bytes.*/
2709    public void setMemory(int memory) { this.memory = memory; }
2710
2711    /** Get the total amount of buffer memory, in bytes.*/
2712    public int getMemory() { return memory; }
2713
2714    /** Set the progressable object in order to report progress. */
2715    public void setProgressable(Progressable progressable) {
2716      this.progressable = progressable;
2717    }
2718    
2719    /** 
2720     * Perform a file sort from a set of input files into an output file.
2721     * @param inFiles the files to be sorted
2722     * @param outFile the sorted output file
2723     * @param deleteInput should the input files be deleted as they are read?
2724     */
2725    public void sort(Path[] inFiles, Path outFile,
2726                     boolean deleteInput) throws IOException {
2727      if (fs.exists(outFile)) {
2728        throw new IOException("already exists: " + outFile);
2729      }
2730
2731      this.inFiles = inFiles;
2732      this.outFile = outFile;
2733
2734      int segments = sortPass(deleteInput);
2735      if (segments > 1) {
2736        mergePass(outFile.getParent());
2737      }
2738    }
2739
2740    /** 
2741     * Perform a file sort from a set of input files and return an iterator.
2742     * @param inFiles the files to be sorted
2743     * @param tempDir the directory where temp files are created during sort
2744     * @param deleteInput should the input files be deleted as they are read?
2745     * @return iterator the RawKeyValueIterator
2746     */
2747    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2748                                              boolean deleteInput) throws IOException {
2749      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2750      if (fs.exists(outFile)) {
2751        throw new IOException("already exists: " + outFile);
2752      }
2753      this.inFiles = inFiles;
2754      //outFile will basically be used as prefix for temp files in the cases
2755      //where sort outputs multiple sorted segments. For the single segment
2756      //case, the outputFile itself will contain the sorted data for that
2757      //segment
2758      this.outFile = outFile;
2759
2760      int segments = sortPass(deleteInput);
2761      if (segments > 1)
2762        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2763                     tempDir);
2764      else if (segments == 1)
2765        return merge(new Path[]{outFile}, true, tempDir);
2766      else return null;
2767    }
2768
2769    /**
2770     * The backwards compatible interface to sort.
2771     * @param inFile the input file to sort
2772     * @param outFile the sorted output file
2773     */
2774    public void sort(Path inFile, Path outFile) throws IOException {
2775      sort(new Path[]{inFile}, outFile, false);
2776    }
2777    
2778    private int sortPass(boolean deleteInput) throws IOException {
2779      if(LOG.isDebugEnabled()) {
2780        LOG.debug("running sort pass");
2781      }
2782      SortPass sortPass = new SortPass();         // make the SortPass
2783      sortPass.setProgressable(progressable);
2784      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2785      try {
2786        return sortPass.run(deleteInput);         // run it
2787      } finally {
2788        sortPass.close();                         // close it
2789      }
2790    }
2791
2792    private class SortPass {
2793      private int memoryLimit = memory/4;
2794      private int recordLimit = 1000000;
2795      
2796      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2797      private byte[] rawBuffer;
2798
2799      private int[] keyOffsets = new int[1024];
2800      private int[] pointers = new int[keyOffsets.length];
2801      private int[] pointersCopy = new int[keyOffsets.length];
2802      private int[] keyLengths = new int[keyOffsets.length];
2803      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2804      
2805      private ArrayList segmentLengths = new ArrayList();
2806      
2807      private Reader in = null;
2808      private FSDataOutputStream out = null;
2809      private FSDataOutputStream indexOut = null;
2810      private Path outName;
2811
2812      private Progressable progressable = null;
2813
2814      public int run(boolean deleteInput) throws IOException {
2815        int segments = 0;
2816        int currentFile = 0;
2817        boolean atEof = (currentFile >= inFiles.length);
2818        CompressionType compressionType;
2819        CompressionCodec codec = null;
2820        segmentLengths.clear();
2821        if (atEof) {
2822          return 0;
2823        }
2824        
2825        // Initialize
2826        in = new Reader(fs, inFiles[currentFile], conf);
2827        compressionType = in.getCompressionType();
2828        codec = in.getCompressionCodec();
2829        
2830        for (int i=0; i < rawValues.length; ++i) {
2831          rawValues[i] = null;
2832        }
2833        
2834        while (!atEof) {
2835          int count = 0;
2836          int bytesProcessed = 0;
2837          rawKeys.reset();
2838          while (!atEof && 
2839                 bytesProcessed < memoryLimit && count < recordLimit) {
2840
2841            // Read a record into buffer
2842            // Note: Attempt to re-use 'rawValue' as far as possible
2843            int keyOffset = rawKeys.getLength();       
2844            ValueBytes rawValue = 
2845              (count == keyOffsets.length || rawValues[count] == null) ? 
2846              in.createValueBytes() : 
2847              rawValues[count];
2848            int recordLength = in.nextRaw(rawKeys, rawValue);
2849            if (recordLength == -1) {
2850              in.close();
2851              if (deleteInput) {
2852                fs.delete(inFiles[currentFile], true);
2853              }
2854              currentFile += 1;
2855              atEof = currentFile >= inFiles.length;
2856              if (!atEof) {
2857                in = new Reader(fs, inFiles[currentFile], conf);
2858              } else {
2859                in = null;
2860              }
2861              continue;
2862            }
2863
2864            int keyLength = rawKeys.getLength() - keyOffset;
2865
2866            if (count == keyOffsets.length)
2867              grow();
2868
2869            keyOffsets[count] = keyOffset;                // update pointers
2870            pointers[count] = count;
2871            keyLengths[count] = keyLength;
2872            rawValues[count] = rawValue;
2873
2874            bytesProcessed += recordLength; 
2875            count++;
2876          }
2877
2878          // buffer is full -- sort & flush it
2879          if(LOG.isDebugEnabled()) {
2880            LOG.debug("flushing segment " + segments);
2881          }
2882          rawBuffer = rawKeys.getData();
2883          sort(count);
2884          // indicate we're making progress
2885          if (progressable != null) {
2886            progressable.progress();
2887          }
2888          flush(count, bytesProcessed, compressionType, codec, 
2889                segments==0 && atEof);
2890          segments++;
2891        }
2892        return segments;
2893      }
2894
2895      public void close() throws IOException {
2896        if (in != null) {
2897          in.close();
2898        }
2899        if (out != null) {
2900          out.close();
2901        }
2902        if (indexOut != null) {
2903          indexOut.close();
2904        }
2905      }
2906
2907      private void grow() {
2908        int newLength = keyOffsets.length * 3 / 2;
2909        keyOffsets = grow(keyOffsets, newLength);
2910        pointers = grow(pointers, newLength);
2911        pointersCopy = new int[newLength];
2912        keyLengths = grow(keyLengths, newLength);
2913        rawValues = grow(rawValues, newLength);
2914      }
2915
2916      private int[] grow(int[] old, int newLength) {
2917        int[] result = new int[newLength];
2918        System.arraycopy(old, 0, result, 0, old.length);
2919        return result;
2920      }
2921      
2922      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2923        ValueBytes[] result = new ValueBytes[newLength];
2924        System.arraycopy(old, 0, result, 0, old.length);
2925        for (int i=old.length; i < newLength; ++i) {
2926          result[i] = null;
2927        }
2928        return result;
2929      }
2930
2931      private void flush(int count, int bytesProcessed, 
2932                         CompressionType compressionType, 
2933                         CompressionCodec codec, 
2934                         boolean done) throws IOException {
2935        if (out == null) {
2936          outName = done ? outFile : outFile.suffix(".0");
2937          out = fs.create(outName);
2938          if (!done) {
2939            indexOut = fs.create(outName.suffix(".index"));
2940          }
2941        }
2942
2943        long segmentStart = out.getPos();
2944        Writer writer = createWriter(conf, Writer.stream(out), 
2945            Writer.keyClass(keyClass), Writer.valueClass(valClass),
2946            Writer.compression(compressionType, codec),
2947            Writer.metadata(done ? metadata : new Metadata()));
2948        
2949        if (!done) {
2950          writer.sync = null;                     // disable sync on temp files
2951        }
2952
2953        for (int i = 0; i < count; i++) {         // write in sorted order
2954          int p = pointers[i];
2955          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2956        }
2957        writer.close();
2958        
2959        if (!done) {
2960          // Save the segment length
2961          WritableUtils.writeVLong(indexOut, segmentStart);
2962          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2963          indexOut.flush();
2964        }
2965      }
2966
2967      private void sort(int count) {
2968        System.arraycopy(pointers, 0, pointersCopy, 0, count);
2969        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2970      }
2971      class SeqFileComparator implements Comparator<IntWritable> {
2972        @Override
2973        public int compare(IntWritable I, IntWritable J) {
2974          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
2975                                    keyLengths[I.get()], rawBuffer, 
2976                                    keyOffsets[J.get()], keyLengths[J.get()]);
2977        }
2978      }
2979      
2980      /** set the progressable object in order to report progress */
2981      public void setProgressable(Progressable progressable)
2982      {
2983        this.progressable = progressable;
2984      }
2985      
2986    } // SequenceFile.Sorter.SortPass
2987
2988    /** The interface to iterate over raw keys/values of SequenceFiles. */
2989    public static interface RawKeyValueIterator {
2990      /** Gets the current raw key
2991       * @return DataOutputBuffer
2992       * @throws IOException
2993       */
2994      DataOutputBuffer getKey() throws IOException; 
2995      /** Gets the current raw value
2996       * @return ValueBytes 
2997       * @throws IOException
2998       */
2999      ValueBytes getValue() throws IOException; 
3000      /** Sets up the current key and value (for getKey and getValue)
3001       * @return true if there exists a key/value, false otherwise 
3002       * @throws IOException
3003       */
3004      boolean next() throws IOException;
3005      /** closes the iterator so that the underlying streams can be closed
3006       * @throws IOException
3007       */
3008      void close() throws IOException;
3009      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3010       * indicating the bytes processed by the iterator so far
3011       */
3012      Progress getProgress();
3013    }    
3014    
3015    /**
3016     * Merges the list of segments of type <code>SegmentDescriptor</code>
3017     * @param segments the list of SegmentDescriptors
3018     * @param tmpDir the directory to write temporary files into
3019     * @return RawKeyValueIterator
3020     * @throws IOException
3021     */
3022    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3023                                     Path tmpDir) 
3024      throws IOException {
3025      // pass in object to report progress, if present
3026      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3027      return mQueue.merge();
3028    }
3029
3030    /**
3031     * Merges the contents of files passed in Path[] using a max factor value
3032     * that is already set
3033     * @param inNames the array of path names
3034     * @param deleteInputs true if the input files should be deleted when 
3035     * unnecessary
3036     * @param tmpDir the directory to write temporary files into
3037     * @return RawKeyValueIteratorMergeQueue
3038     * @throws IOException
3039     */
3040    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3041                                     Path tmpDir) 
3042      throws IOException {
3043      return merge(inNames, deleteInputs, 
3044                   (inNames.length < factor) ? inNames.length : factor,
3045                   tmpDir);
3046    }
3047
3048    /**
3049     * Merges the contents of files passed in Path[]
3050     * @param inNames the array of path names
3051     * @param deleteInputs true if the input files should be deleted when 
3052     * unnecessary
3053     * @param factor the factor that will be used as the maximum merge fan-in
3054     * @param tmpDir the directory to write temporary files into
3055     * @return RawKeyValueIteratorMergeQueue
3056     * @throws IOException
3057     */
3058    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3059                                     int factor, Path tmpDir) 
3060      throws IOException {
3061      //get the segments from inNames
3062      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3063      for (int i = 0; i < inNames.length; i++) {
3064        SegmentDescriptor s = new SegmentDescriptor(0,
3065            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3066        s.preserveInput(!deleteInputs);
3067        s.doSync();
3068        a.add(s);
3069      }
3070      this.factor = factor;
3071      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3072      return mQueue.merge();
3073    }
3074
3075    /**
3076     * Merges the contents of files passed in Path[]
3077     * @param inNames the array of path names
3078     * @param tempDir the directory for creating temp files during merge
3079     * @param deleteInputs true if the input files should be deleted when 
3080     * unnecessary
3081     * @return RawKeyValueIteratorMergeQueue
3082     * @throws IOException
3083     */
3084    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3085                                     boolean deleteInputs) 
3086      throws IOException {
3087      //outFile will basically be used as prefix for temp files for the
3088      //intermediate merge outputs           
3089      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3090      //get the segments from inNames
3091      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3092      for (int i = 0; i < inNames.length; i++) {
3093        SegmentDescriptor s = new SegmentDescriptor(0,
3094            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3095        s.preserveInput(!deleteInputs);
3096        s.doSync();
3097        a.add(s);
3098      }
3099      factor = (inNames.length < factor) ? inNames.length : factor;
3100      // pass in object to report progress, if present
3101      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3102      return mQueue.merge();
3103    }
3104
3105    /**
3106     * Clones the attributes (like compression of the input file and creates a 
3107     * corresponding Writer
3108     * @param inputFile the path of the input file whose attributes should be 
3109     * cloned
3110     * @param outputFile the path of the output file 
3111     * @param prog the Progressable to report status during the file write
3112     * @return Writer
3113     * @throws IOException
3114     */
3115    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3116                                      Progressable prog) throws IOException {
3117      Reader reader = new Reader(conf,
3118                                 Reader.file(inputFile),
3119                                 new Reader.OnlyHeaderOption());
3120      CompressionType compress = reader.getCompressionType();
3121      CompressionCodec codec = reader.getCompressionCodec();
3122      reader.close();
3123
3124      Writer writer = createWriter(conf, 
3125                                   Writer.file(outputFile), 
3126                                   Writer.keyClass(keyClass), 
3127                                   Writer.valueClass(valClass), 
3128                                   Writer.compression(compress, codec), 
3129                                   Writer.progressable(prog));
3130      return writer;
3131    }
3132
3133    /**
3134     * Writes records from RawKeyValueIterator into a file represented by the 
3135     * passed writer
3136     * @param records the RawKeyValueIterator
3137     * @param writer the Writer created earlier 
3138     * @throws IOException
3139     */
3140    public void writeFile(RawKeyValueIterator records, Writer writer) 
3141      throws IOException {
3142      while(records.next()) {
3143        writer.appendRaw(records.getKey().getData(), 0, 
3144                         records.getKey().getLength(), records.getValue());
3145      }
3146      writer.sync();
3147    }
3148        
3149    /** Merge the provided files.
3150     * @param inFiles the array of input path names
3151     * @param outFile the final output file
3152     * @throws IOException
3153     */
3154    public void merge(Path[] inFiles, Path outFile) throws IOException {
3155      if (fs.exists(outFile)) {
3156        throw new IOException("already exists: " + outFile);
3157      }
3158      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3159      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3160      
3161      writeFile(r, writer);
3162
3163      writer.close();
3164    }
3165
3166    /** sort calls this to generate the final merged output */
3167    private int mergePass(Path tmpDir) throws IOException {
3168      if(LOG.isDebugEnabled()) {
3169        LOG.debug("running merge pass");
3170      }
3171      Writer writer = cloneFileAttributes(
3172                                          outFile.suffix(".0"), outFile, null);
3173      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3174                                    outFile.suffix(".0.index"), tmpDir);
3175      writeFile(r, writer);
3176
3177      writer.close();
3178      return 0;
3179    }
3180
3181    /** Used by mergePass to merge the output of the sort
3182     * @param inName the name of the input file containing sorted segments
3183     * @param indexIn the offsets of the sorted segments
3184     * @param tmpDir the relative directory to store intermediate results in
3185     * @return RawKeyValueIterator
3186     * @throws IOException
3187     */
3188    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3189      throws IOException {
3190      //get the segments from indexIn
3191      //we create a SegmentContainer so that we can track segments belonging to
3192      //inName and delete inName as soon as we see that we have looked at all
3193      //the contained segments during the merge process & hence don't need 
3194      //them anymore
3195      SegmentContainer container = new SegmentContainer(inName, indexIn);
3196      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3197      return mQueue.merge();
3198    }
3199    
3200    /** This class implements the core of the merge logic */
3201    private class MergeQueue extends PriorityQueue 
3202      implements RawKeyValueIterator {
3203      private boolean compress;
3204      private boolean blockCompress;
3205      private DataOutputBuffer rawKey = new DataOutputBuffer();
3206      private ValueBytes rawValue;
3207      private long totalBytesProcessed;
3208      private float progPerByte;
3209      private Progress mergeProgress = new Progress();
3210      private Path tmpDir;
3211      private Progressable progress = null; //handle to the progress reporting object
3212      private SegmentDescriptor minSegment;
3213      
3214      //a TreeMap used to store the segments sorted by size (segment offset and
3215      //segment path name is used to break ties between segments of same sizes)
3216      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3217        new TreeMap<SegmentDescriptor, Void>();
3218            
3219      @SuppressWarnings("unchecked")
3220      public void put(SegmentDescriptor stream) throws IOException {
3221        if (size() == 0) {
3222          compress = stream.in.isCompressed();
3223          blockCompress = stream.in.isBlockCompressed();
3224        } else if (compress != stream.in.isCompressed() || 
3225                   blockCompress != stream.in.isBlockCompressed()) {
3226          throw new IOException("All merged files must be compressed or not.");
3227        } 
3228        super.put(stream);
3229      }
3230      
3231      /**
3232       * A queue of file segments to merge
3233       * @param segments the file segments to merge
3234       * @param tmpDir a relative local directory to save intermediate files in
3235       * @param progress the reference to the Progressable object
3236       */
3237      public MergeQueue(List <SegmentDescriptor> segments,
3238          Path tmpDir, Progressable progress) {
3239        int size = segments.size();
3240        for (int i = 0; i < size; i++) {
3241          sortedSegmentSizes.put(segments.get(i), null);
3242        }
3243        this.tmpDir = tmpDir;
3244        this.progress = progress;
3245      }
3246      @Override
3247      protected boolean lessThan(Object a, Object b) {
3248        // indicate we're making progress
3249        if (progress != null) {
3250          progress.progress();
3251        }
3252        SegmentDescriptor msa = (SegmentDescriptor)a;
3253        SegmentDescriptor msb = (SegmentDescriptor)b;
3254        return comparator.compare(msa.getKey().getData(), 0, 
3255                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3256                                  msb.getKey().getLength()) < 0;
3257      }
3258      @Override
3259      public void close() throws IOException {
3260        SegmentDescriptor ms;                           // close inputs
3261        while ((ms = (SegmentDescriptor)pop()) != null) {
3262          ms.cleanup();
3263        }
3264        minSegment = null;
3265      }
3266      @Override
3267      public DataOutputBuffer getKey() throws IOException {
3268        return rawKey;
3269      }
3270      @Override
3271      public ValueBytes getValue() throws IOException {
3272        return rawValue;
3273      }
3274      @Override
3275      public boolean next() throws IOException {
3276        if (size() == 0)
3277          return false;
3278        if (minSegment != null) {
3279          //minSegment is non-null for all invocations of next except the first
3280          //one. For the first invocation, the priority queue is ready for use
3281          //but for the subsequent invocations, first adjust the queue 
3282          adjustPriorityQueue(minSegment);
3283          if (size() == 0) {
3284            minSegment = null;
3285            return false;
3286          }
3287        }
3288        minSegment = (SegmentDescriptor)top();
3289        long startPos = minSegment.in.getPosition(); // Current position in stream
3290        //save the raw key reference
3291        rawKey = minSegment.getKey();
3292        //load the raw value. Re-use the existing rawValue buffer
3293        if (rawValue == null) {
3294          rawValue = minSegment.in.createValueBytes();
3295        }
3296        minSegment.nextRawValue(rawValue);
3297        long endPos = minSegment.in.getPosition(); // End position after reading value
3298        updateProgress(endPos - startPos);
3299        return true;
3300      }
3301      
3302      @Override
3303      public Progress getProgress() {
3304        return mergeProgress; 
3305      }
3306
3307      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3308        long startPos = ms.in.getPosition(); // Current position in stream
3309        boolean hasNext = ms.nextRawKey();
3310        long endPos = ms.in.getPosition(); // End position after reading key
3311        updateProgress(endPos - startPos);
3312        if (hasNext) {
3313          adjustTop();
3314        } else {
3315          pop();
3316          ms.cleanup();
3317        }
3318      }
3319
3320      private void updateProgress(long bytesProcessed) {
3321        totalBytesProcessed += bytesProcessed;
3322        if (progPerByte > 0) {
3323          mergeProgress.set(totalBytesProcessed * progPerByte);
3324        }
3325      }
3326      
3327      /** This is the single level merge that is called multiple times 
3328       * depending on the factor size and the number of segments
3329       * @return RawKeyValueIterator
3330       * @throws IOException
3331       */
3332      public RawKeyValueIterator merge() throws IOException {
3333        //create the MergeStreams from the sorted map created in the constructor
3334        //and dump the final output to a file
3335        int numSegments = sortedSegmentSizes.size();
3336        int origFactor = factor;
3337        int passNo = 1;
3338        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3339        do {
3340          //get the factor for this pass of merge
3341          factor = getPassFactor(passNo, numSegments);
3342          List<SegmentDescriptor> segmentsToMerge =
3343            new ArrayList<SegmentDescriptor>();
3344          int segmentsConsidered = 0;
3345          int numSegmentsToConsider = factor;
3346          while (true) {
3347            //extract the smallest 'factor' number of segment pointers from the 
3348            //TreeMap. Call cleanup on the empty segments (no key/value data)
3349            SegmentDescriptor[] mStream = 
3350              getSegmentDescriptors(numSegmentsToConsider);
3351            for (int i = 0; i < mStream.length; i++) {
3352              if (mStream[i].nextRawKey()) {
3353                segmentsToMerge.add(mStream[i]);
3354                segmentsConsidered++;
3355                // Count the fact that we read some bytes in calling nextRawKey()
3356                updateProgress(mStream[i].in.getPosition());
3357              }
3358              else {
3359                mStream[i].cleanup();
3360                numSegments--; //we ignore this segment for the merge
3361              }
3362            }
3363            //if we have the desired number of segments
3364            //or looked at all available segments, we break
3365            if (segmentsConsidered == factor || 
3366                sortedSegmentSizes.size() == 0) {
3367              break;
3368            }
3369              
3370            numSegmentsToConsider = factor - segmentsConsidered;
3371          }
3372          //feed the streams to the priority queue
3373          initialize(segmentsToMerge.size()); clear();
3374          for (int i = 0; i < segmentsToMerge.size(); i++) {
3375            put(segmentsToMerge.get(i));
3376          }
3377          //if we have lesser number of segments remaining, then just return the
3378          //iterator, else do another single level merge
3379          if (numSegments <= factor) {
3380            //calculate the length of the remaining segments. Required for 
3381            //calculating the merge progress
3382            long totalBytes = 0;
3383            for (int i = 0; i < segmentsToMerge.size(); i++) {
3384              totalBytes += segmentsToMerge.get(i).segmentLength;
3385            }
3386            if (totalBytes != 0) //being paranoid
3387              progPerByte = 1.0f / (float)totalBytes;
3388            //reset factor to what it originally was
3389            factor = origFactor;
3390            return this;
3391          } else {
3392            //we want to spread the creation of temp files on multiple disks if 
3393            //available under the space constraints
3394            long approxOutputSize = 0; 
3395            for (SegmentDescriptor s : segmentsToMerge) {
3396              approxOutputSize += s.segmentLength + 
3397                                  ChecksumFileSystem.getApproxChkSumLength(
3398                                  s.segmentLength);
3399            }
3400            Path tmpFilename = 
3401              new Path(tmpDir, "intermediate").suffix("." + passNo);
3402
3403            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3404                                                tmpFilename.toString(),
3405                                                approxOutputSize, conf);
3406            if(LOG.isDebugEnabled()) { 
3407              LOG.debug("writing intermediate results to " + outputFile);
3408            }
3409            Writer writer = cloneFileAttributes(
3410                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3411                                                fs.makeQualified(outputFile), null);
3412            writer.sync = null; //disable sync for temp files
3413            writeFile(this, writer);
3414            writer.close();
3415            
3416            //we finished one single level merge; now clean up the priority 
3417            //queue
3418            this.close();
3419            
3420            SegmentDescriptor tempSegment = 
3421              new SegmentDescriptor(0,
3422                  fs.getFileStatus(outputFile).getLen(), outputFile);
3423            //put the segment back in the TreeMap
3424            sortedSegmentSizes.put(tempSegment, null);
3425            numSegments = sortedSegmentSizes.size();
3426            passNo++;
3427          }
3428          //we are worried about only the first pass merge factor. So reset the 
3429          //factor to what it originally was
3430          factor = origFactor;
3431        } while(true);
3432      }
3433  
3434      //Hadoop-591
3435      public int getPassFactor(int passNo, int numSegments) {
3436        if (passNo > 1 || numSegments <= factor || factor == 1) 
3437          return factor;
3438        int mod = (numSegments - 1) % (factor - 1);
3439        if (mod == 0)
3440          return factor;
3441        return mod + 1;
3442      }
3443      
3444      /** Return (& remove) the requested number of segment descriptors from the
3445       * sorted map.
3446       */
3447      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3448        if (numDescriptors > sortedSegmentSizes.size())
3449          numDescriptors = sortedSegmentSizes.size();
3450        SegmentDescriptor[] SegmentDescriptors = 
3451          new SegmentDescriptor[numDescriptors];
3452        Iterator iter = sortedSegmentSizes.keySet().iterator();
3453        int i = 0;
3454        while (i < numDescriptors) {
3455          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3456          iter.remove();
3457        }
3458        return SegmentDescriptors;
3459      }
3460    } // SequenceFile.Sorter.MergeQueue
3461
3462    /** This class defines a merge segment. This class can be subclassed to 
3463     * provide a customized cleanup method implementation. In this 
3464     * implementation, cleanup closes the file handle and deletes the file 
3465     */
3466    public class SegmentDescriptor implements Comparable {
3467      
3468      long segmentOffset; //the start of the segment in the file
3469      long segmentLength; //the length of the segment
3470      Path segmentPathName; //the path name of the file containing the segment
3471      boolean ignoreSync = true; //set to true for temp files
3472      private Reader in = null; 
3473      private DataOutputBuffer rawKey = null; //this will hold the current key
3474      private boolean preserveInput = false; //delete input segment files?
3475      
3476      /** Constructs a segment
3477       * @param segmentOffset the offset of the segment in the file
3478       * @param segmentLength the length of the segment
3479       * @param segmentPathName the path name of the file containing the segment
3480       */
3481      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3482                                Path segmentPathName) {
3483        this.segmentOffset = segmentOffset;
3484        this.segmentLength = segmentLength;
3485        this.segmentPathName = segmentPathName;
3486      }
3487      
3488      /** Do the sync checks */
3489      public void doSync() {ignoreSync = false;}
3490      
3491      /** Whether to delete the files when no longer needed */
3492      public void preserveInput(boolean preserve) {
3493        preserveInput = preserve;
3494      }
3495
3496      public boolean shouldPreserveInput() {
3497        return preserveInput;
3498      }
3499      
3500      @Override
3501      public int compareTo(Object o) {
3502        SegmentDescriptor that = (SegmentDescriptor)o;
3503        if (this.segmentLength != that.segmentLength) {
3504          return (this.segmentLength < that.segmentLength ? -1 : 1);
3505        }
3506        if (this.segmentOffset != that.segmentOffset) {
3507          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3508        }
3509        return (this.segmentPathName.toString()).
3510          compareTo(that.segmentPathName.toString());
3511      }
3512
3513      @Override
3514      public boolean equals(Object o) {
3515        if (!(o instanceof SegmentDescriptor)) {
3516          return false;
3517        }
3518        SegmentDescriptor that = (SegmentDescriptor)o;
3519        if (this.segmentLength == that.segmentLength &&
3520            this.segmentOffset == that.segmentOffset &&
3521            this.segmentPathName.toString().equals(
3522              that.segmentPathName.toString())) {
3523          return true;
3524        }
3525        return false;
3526      }
3527
3528      @Override
3529      public int hashCode() {
3530        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3531      }
3532
3533      /** Fills up the rawKey object with the key returned by the Reader
3534       * @return true if there is a key returned; false, otherwise
3535       * @throws IOException
3536       */
3537      public boolean nextRawKey() throws IOException {
3538        if (in == null) {
3539          int bufferSize = getBufferSize(conf); 
3540          Reader reader = new Reader(conf,
3541                                     Reader.file(segmentPathName), 
3542                                     Reader.bufferSize(bufferSize),
3543                                     Reader.start(segmentOffset), 
3544                                     Reader.length(segmentLength));
3545        
3546          //sometimes we ignore syncs especially for temp merge files
3547          if (ignoreSync) reader.ignoreSync();
3548
3549          if (reader.getKeyClass() != keyClass)
3550            throw new IOException("wrong key class: " + reader.getKeyClass() +
3551                                  " is not " + keyClass);
3552          if (reader.getValueClass() != valClass)
3553            throw new IOException("wrong value class: "+reader.getValueClass()+
3554                                  " is not " + valClass);
3555          this.in = reader;
3556          rawKey = new DataOutputBuffer();
3557        }
3558        rawKey.reset();
3559        int keyLength = 
3560          in.nextRawKey(rawKey);
3561        return (keyLength >= 0);
3562      }
3563
3564      /** Fills up the passed rawValue with the value corresponding to the key
3565       * read earlier
3566       * @param rawValue
3567       * @return the length of the value
3568       * @throws IOException
3569       */
3570      public int nextRawValue(ValueBytes rawValue) throws IOException {
3571        int valLength = in.nextRawValue(rawValue);
3572        return valLength;
3573      }
3574      
3575      /** Returns the stored rawKey */
3576      public DataOutputBuffer getKey() {
3577        return rawKey;
3578      }
3579      
3580      /** closes the underlying reader */
3581      private void close() throws IOException {
3582        this.in.close();
3583        this.in = null;
3584      }
3585
3586      /** The default cleanup. Subclasses can override this with a custom 
3587       * cleanup 
3588       */
3589      public void cleanup() throws IOException {
3590        close();
3591        if (!preserveInput) {
3592          fs.delete(segmentPathName, true);
3593        }
3594      }
3595    } // SequenceFile.Sorter.SegmentDescriptor
3596    
3597    /** This class provisions multiple segments contained within a single
3598     *  file
3599     */
3600    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3601
3602      SegmentContainer parentContainer = null;
3603
3604      /** Constructs a segment
3605       * @param segmentOffset the offset of the segment in the file
3606       * @param segmentLength the length of the segment
3607       * @param segmentPathName the path name of the file containing the segment
3608       * @param parent the parent SegmentContainer that holds the segment
3609       */
3610      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3611                                       Path segmentPathName, SegmentContainer parent) {
3612        super(segmentOffset, segmentLength, segmentPathName);
3613        this.parentContainer = parent;
3614      }
3615      /** The default cleanup. Subclasses can override this with a custom 
3616       * cleanup 
3617       */
3618      @Override
3619      public void cleanup() throws IOException {
3620        super.close();
3621        if (super.shouldPreserveInput()) return;
3622        parentContainer.cleanup();
3623      }
3624      
3625      @Override
3626      public boolean equals(Object o) {
3627        if (!(o instanceof LinkedSegmentsDescriptor)) {
3628          return false;
3629        }
3630        return super.equals(o);
3631      }
3632    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3633
3634    /** The class that defines a container for segments to be merged. Primarily
3635     * required to delete temp files as soon as all the contained segments
3636     * have been looked at */
3637    private class SegmentContainer {
3638      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3639      private int numSegmentsContained; //# of segments contained
3640      private Path inName; //input file from where segments are created
3641      
3642      //the list of segments read from the file
3643      private ArrayList <SegmentDescriptor> segments = 
3644        new ArrayList <SegmentDescriptor>();
3645      /** This constructor is there primarily to serve the sort routine that 
3646       * generates a single output file with an associated index file */
3647      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3648        //get the segments from indexIn
3649        FSDataInputStream fsIndexIn = fs.open(indexIn);
3650        long end = fs.getFileStatus(indexIn).getLen();
3651        while (fsIndexIn.getPos() < end) {
3652          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3653          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3654          Path segmentName = inName;
3655          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3656                                                    segmentLength, segmentName, this));
3657        }
3658        fsIndexIn.close();
3659        fs.delete(indexIn, true);
3660        numSegmentsContained = segments.size();
3661        this.inName = inName;
3662      }
3663
3664      public List <SegmentDescriptor> getSegmentList() {
3665        return segments;
3666      }
3667      public void cleanup() throws IOException {
3668        numSegmentsCleanedUp++;
3669        if (numSegmentsCleanedUp == numSegmentsContained) {
3670          fs.delete(inName, true);
3671        }
3672      }
3673    } //SequenceFile.Sorter.SegmentContainer
3674
3675  } // SequenceFile.Sorter
3676
3677} // SequenceFile