001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3native;
020
021import java.io.BufferedOutputStream;
022import java.io.EOFException;
023import java.io.File;
024import java.io.FileNotFoundException;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.net.URI;
030import java.security.DigestOutputStream;
031import java.security.MessageDigest;
032import java.security.NoSuchAlgorithmException;
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.TimeUnit;
040
041import com.google.common.base.Preconditions;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.classification.InterfaceStability;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.BufferedFSInputStream;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FSExceptionMessages;
049import org.apache.hadoop.fs.FSInputStream;
050import org.apache.hadoop.fs.FileAlreadyExistsException;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.LocalDirAllocator;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.fs.s3.S3Exception;
057import org.apache.hadoop.io.retry.RetryPolicies;
058import org.apache.hadoop.io.retry.RetryPolicy;
059import org.apache.hadoop.io.retry.RetryProxy;
060import org.apache.hadoop.util.Progressable;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * <p>
066 * A {@link FileSystem} for reading and writing files stored on
067 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
068 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
069 * stores files on S3 in their
070 * native form so they can be read by other S3 tools.
071 *
072 * A note about directories. S3 of course has no "native" support for them.
073 * The idiom we choose then is: for any directory created by this class,
074 * we use an empty object "#{dirpath}_$folder$" as a marker.
075 * Further, to interoperate with other S3 tools, we also accept the following:
076 *  - an object "#{dirpath}/' denoting a directory marker
077 *  - if there exists any objects with the prefix "#{dirpath}/", then the
078 *    directory is said to exist
079 *  - if both a file with the name of a directory and a marker for that
080 *    directory exists, then the *file masks the directory*, and the directory
081 *    is never returned.
082 * </p>
083 * @see org.apache.hadoop.fs.s3.S3FileSystem
084 */
085@InterfaceAudience.Public
086@InterfaceStability.Stable
087public class NativeS3FileSystem extends FileSystem {
088  
089  public static final Logger LOG =
090      LoggerFactory.getLogger(NativeS3FileSystem.class);
091  
092  private static final String FOLDER_SUFFIX = "_$folder$";
093  static final String PATH_DELIMITER = Path.SEPARATOR;
094  private static final int S3_MAX_LISTING_LENGTH = 1000;
095  
096  static class NativeS3FsInputStream extends FSInputStream {
097    
098    private NativeFileSystemStore store;
099    private Statistics statistics;
100    private InputStream in;
101    private final String key;
102    private long pos = 0;
103    
104    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
105      Preconditions.checkNotNull(in, "Null input stream");
106      this.store = store;
107      this.statistics = statistics;
108      this.in = in;
109      this.key = key;
110    }
111    
112    @Override
113    public synchronized int read() throws IOException {
114      int result;
115      try {
116        result = in.read();
117      } catch (IOException e) {
118        LOG.info("Received IOException while reading '{}', attempting to reopen",
119            key);
120        LOG.debug("{}", e, e);
121        try {
122          seek(pos);
123          result = in.read();
124        } catch (EOFException eof) {
125          LOG.debug("EOF on input stream read: {}", eof, eof);
126          result = -1;
127        }
128      } 
129      if (result != -1) {
130        pos++;
131      }
132      if (statistics != null && result != -1) {
133        statistics.incrementBytesRead(1);
134      }
135      return result;
136    }
137    @Override
138    public synchronized int read(byte[] b, int off, int len)
139      throws IOException {
140      if (in == null) {
141        throw new EOFException("Cannot read closed stream");
142      }
143      int result = -1;
144      try {
145        result = in.read(b, off, len);
146      } catch (EOFException eof) {
147        throw eof;
148      } catch (IOException e) {
149        LOG.info( "Received IOException while reading '{}'," +
150                  " attempting to reopen.", key);
151        seek(pos);
152        result = in.read(b, off, len);
153      }
154      if (result > 0) {
155        pos += result;
156      }
157      if (statistics != null && result > 0) {
158        statistics.incrementBytesRead(result);
159      }
160      return result;
161    }
162
163    @Override
164    public synchronized void close() throws IOException {
165      closeInnerStream();
166    }
167
168    /**
169     * Close the inner stream if not null. Even if an exception
170     * is raised during the close, the field is set to null
171     * @throws IOException if raised by the close() operation.
172     */
173    private void closeInnerStream() throws IOException {
174      if (in != null) {
175        try {
176          in.close();
177        } finally {
178          in = null;
179        }
180      }
181    }
182
183    /**
184     * Update inner stream with a new stream and position
185     * @param newStream new stream -must not be null
186     * @param newpos new position
187     * @throws IOException IO exception on a failure to close the existing
188     * stream.
189     */
190    private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
191      Preconditions.checkNotNull(newStream, "Null newstream argument");
192      closeInnerStream();
193      in = newStream;
194      this.pos = newpos;
195    }
196
197    @Override
198    public synchronized void seek(long newpos) throws IOException {
199      if (newpos < 0) {
200        throw new EOFException(
201            FSExceptionMessages.NEGATIVE_SEEK);
202      }
203      if (pos != newpos) {
204        // the seek is attempting to move the current position
205        LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
206        InputStream newStream = store.retrieve(key, newpos);
207        updateInnerStream(newStream, newpos);
208      }
209    }
210
211    @Override
212    public synchronized long getPos() throws IOException {
213      return pos;
214    }
215    @Override
216    public boolean seekToNewSource(long targetPos) throws IOException {
217      return false;
218    }
219  }
220  
221  private class NativeS3FsOutputStream extends OutputStream {
222    
223    private Configuration conf;
224    private String key;
225    private File backupFile;
226    private OutputStream backupStream;
227    private MessageDigest digest;
228    private boolean closed;
229    private LocalDirAllocator lDirAlloc;
230    
231    public NativeS3FsOutputStream(Configuration conf,
232        NativeFileSystemStore store, String key, Progressable progress,
233        int bufferSize) throws IOException {
234      this.conf = conf;
235      this.key = key;
236      this.backupFile = newBackupFile();
237      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
238      try {
239        this.digest = MessageDigest.getInstance("MD5");
240        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
241            new FileOutputStream(backupFile), this.digest));
242      } catch (NoSuchAlgorithmException e) {
243        LOG.warn("Cannot load MD5 digest algorithm," +
244            "skipping message integrity check.", e);
245        this.backupStream = new BufferedOutputStream(
246            new FileOutputStream(backupFile));
247      }
248    }
249
250    private File newBackupFile() throws IOException {
251      if (lDirAlloc == null) {
252        lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
253      }
254      File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
255      result.deleteOnExit();
256      return result;
257    }
258    
259    @Override
260    public void flush() throws IOException {
261      backupStream.flush();
262    }
263    
264    @Override
265    public synchronized void close() throws IOException {
266      if (closed) {
267        return;
268      }
269
270      backupStream.close();
271      LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
272      
273      try {
274        byte[] md5Hash = digest == null ? null : digest.digest();
275        store.storeFile(key, backupFile, md5Hash);
276      } finally {
277        if (!backupFile.delete()) {
278          LOG.warn("Could not delete temporary s3n file: " + backupFile);
279        }
280        super.close();
281        closed = true;
282      } 
283      LOG.info("OutputStream for key '{}' upload complete", key);
284    }
285
286    @Override
287    public void write(int b) throws IOException {
288      backupStream.write(b);
289    }
290
291    @Override
292    public void write(byte[] b, int off, int len) throws IOException {
293      backupStream.write(b, off, len);
294    }
295  }
296  
297  private URI uri;
298  private NativeFileSystemStore store;
299  private Path workingDir;
300  
301  public NativeS3FileSystem() {
302    // set store in initialize()
303  }
304  
305  public NativeS3FileSystem(NativeFileSystemStore store) {
306    this.store = store;
307  }
308
309  /**
310   * Return the protocol scheme for the FileSystem.
311   * <p/>
312   *
313   * @return <code>s3n</code>
314   */
315  @Override
316  public String getScheme() {
317    return "s3n";
318  }
319
320  @Override
321  public void initialize(URI uri, Configuration conf) throws IOException {
322    super.initialize(uri, conf);
323    if (store == null) {
324      store = createDefaultStore(conf);
325    }
326    store.initialize(uri, conf);
327    setConf(conf);
328    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
329    this.workingDir =
330      new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
331  }
332  
333  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
334    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
335    
336    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
337        conf.getInt("fs.s3.maxRetries", 4),
338        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
339    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
340      new HashMap<Class<? extends Exception>, RetryPolicy>();
341    exceptionToPolicyMap.put(IOException.class, basePolicy);
342    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
343    
344    RetryPolicy methodPolicy = RetryPolicies.retryByException(
345        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
346    Map<String, RetryPolicy> methodNameToPolicyMap =
347      new HashMap<String, RetryPolicy>();
348    methodNameToPolicyMap.put("storeFile", methodPolicy);
349    methodNameToPolicyMap.put("rename", methodPolicy);
350    
351    return (NativeFileSystemStore)
352      RetryProxy.create(NativeFileSystemStore.class, store,
353          methodNameToPolicyMap);
354  }
355  
356  private static String pathToKey(Path path) {
357    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
358      // allow uris without trailing slash after bucket to refer to root,
359      // like s3n://mybucket
360      return "";
361    }
362    if (!path.isAbsolute()) {
363      throw new IllegalArgumentException("Path must be absolute: " + path);
364    }
365    String ret = path.toUri().getPath().substring(1); // remove initial slash
366    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
367      ret = ret.substring(0, ret.length() -1);
368  }
369    return ret;
370  }
371  
372  private static Path keyToPath(String key) {
373    return new Path("/" + key);
374  }
375  
376  private Path makeAbsolute(Path path) {
377    if (path.isAbsolute()) {
378      return path;
379    }
380    return new Path(workingDir, path);
381  }
382
383  /** This optional operation is not yet supported. */
384  @Override
385  public FSDataOutputStream append(Path f, int bufferSize,
386      Progressable progress) throws IOException {
387    throw new IOException("Not supported");
388  }
389  
390  @Override
391  public FSDataOutputStream create(Path f, FsPermission permission,
392      boolean overwrite, int bufferSize, short replication, long blockSize,
393      Progressable progress) throws IOException {
394
395    if (exists(f) && !overwrite) {
396      throw new FileAlreadyExistsException("File already exists: " + f);
397    }
398    
399    if(LOG.isDebugEnabled()) {
400      LOG.debug("Creating new file '" + f + "' in S3");
401    }
402    Path absolutePath = makeAbsolute(f);
403    String key = pathToKey(absolutePath);
404    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
405        key, progress, bufferSize), statistics);
406  }
407  
408  @Override
409  public boolean delete(Path f, boolean recurse) throws IOException {
410    FileStatus status;
411    try {
412      status = getFileStatus(f);
413    } catch (FileNotFoundException e) {
414      if(LOG.isDebugEnabled()) {
415        LOG.debug("Delete called for '" + f +
416            "' but file does not exist, so returning false");
417      }
418      return false;
419    }
420    Path absolutePath = makeAbsolute(f);
421    String key = pathToKey(absolutePath);
422    if (status.isDirectory()) {
423      if (!recurse && listStatus(f).length > 0) {
424        throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
425      }
426
427      createParent(f);
428
429      if(LOG.isDebugEnabled()) {
430        LOG.debug("Deleting directory '" + f  + "'");
431      }
432      String priorLastKey = null;
433      do {
434        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
435        for (FileMetadata file : listing.getFiles()) {
436          store.delete(file.getKey());
437        }
438        priorLastKey = listing.getPriorLastKey();
439      } while (priorLastKey != null);
440
441      try {
442        store.delete(key + FOLDER_SUFFIX);
443      } catch (FileNotFoundException e) {
444        //this is fine, we don't require a marker
445      }
446    } else {
447      if(LOG.isDebugEnabled()) {
448        LOG.debug("Deleting file '" + f + "'");
449      }
450      createParent(f);
451      store.delete(key);
452    }
453    return true;
454  }
455
456  @Override
457  public FileStatus getFileStatus(Path f) throws IOException {
458    Path absolutePath = makeAbsolute(f);
459    String key = pathToKey(absolutePath);
460    
461    if (key.length() == 0) { // root always exists
462      return newDirectory(absolutePath);
463    }
464    
465    if(LOG.isDebugEnabled()) {
466      LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
467    }
468    FileMetadata meta = store.retrieveMetadata(key);
469    if (meta != null) {
470      if(LOG.isDebugEnabled()) {
471        LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
472      }
473      return newFile(meta, absolutePath);
474    }
475    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
476      if(LOG.isDebugEnabled()) {
477        LOG.debug("getFileStatus returning 'directory' for key '" + key +
478            "' as '" + key + FOLDER_SUFFIX + "' exists");
479      }
480      return newDirectory(absolutePath);
481    }
482    
483    if(LOG.isDebugEnabled()) {
484      LOG.debug("getFileStatus listing key '" + key + "'");
485    }
486    PartialListing listing = store.list(key, 1);
487    if (listing.getFiles().length > 0 ||
488        listing.getCommonPrefixes().length > 0) {
489      if(LOG.isDebugEnabled()) {
490        LOG.debug("getFileStatus returning 'directory' for key '" + key +
491            "' as it has contents");
492      }
493      return newDirectory(absolutePath);
494    }
495    
496    if(LOG.isDebugEnabled()) {
497      LOG.debug("getFileStatus could not find key '" + key + "'");
498    }
499    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
500  }
501
502  @Override
503  public URI getUri() {
504    return uri;
505  }
506
507  /**
508   * <p>
509   * If <code>f</code> is a file, this method will make a single call to S3.
510   * If <code>f</code> is a directory, this method will make a maximum of
511   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
512   * files and directories contained directly in <code>f</code>.
513   * </p>
514   */
515  @Override
516  public FileStatus[] listStatus(Path f) throws IOException {
517
518    Path absolutePath = makeAbsolute(f);
519    String key = pathToKey(absolutePath);
520    
521    if (key.length() > 0) {
522      FileMetadata meta = store.retrieveMetadata(key);
523      if (meta != null) {
524        return new FileStatus[] { newFile(meta, absolutePath) };
525      }
526    }
527    
528    URI pathUri = absolutePath.toUri();
529    Set<FileStatus> status = new TreeSet<FileStatus>();
530    String priorLastKey = null;
531    do {
532      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
533      for (FileMetadata fileMetadata : listing.getFiles()) {
534        Path subpath = keyToPath(fileMetadata.getKey());
535        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
536
537        if (fileMetadata.getKey().equals(key + "/")) {
538          // this is just the directory we have been asked to list
539        }
540        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
541          status.add(newDirectory(new Path(
542              absolutePath,
543              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
544        }
545        else {
546          status.add(newFile(fileMetadata, subpath));
547        }
548      }
549      for (String commonPrefix : listing.getCommonPrefixes()) {
550        Path subpath = keyToPath(commonPrefix);
551        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
552        status.add(newDirectory(new Path(absolutePath, relativePath)));
553      }
554      priorLastKey = listing.getPriorLastKey();
555    } while (priorLastKey != null);
556    
557    if (status.isEmpty() &&
558        key.length() > 0 &&
559        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
560      throw new FileNotFoundException("File " + f + " does not exist.");
561    }
562    
563    return status.toArray(new FileStatus[status.size()]);
564  }
565  
566  private FileStatus newFile(FileMetadata meta, Path path) {
567    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
568        meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
569  }
570  
571  private FileStatus newDirectory(Path path) {
572    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
573  }
574
575  @Override
576  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
577    Path absolutePath = makeAbsolute(f);
578    List<Path> paths = new ArrayList<Path>();
579    do {
580      paths.add(0, absolutePath);
581      absolutePath = absolutePath.getParent();
582    } while (absolutePath != null);
583    
584    boolean result = true;
585    for (Path path : paths) {
586      result &= mkdir(path);
587    }
588    return result;
589  }
590  
591  private boolean mkdir(Path f) throws IOException {
592    try {
593      FileStatus fileStatus = getFileStatus(f);
594      if (fileStatus.isFile()) {
595        throw new FileAlreadyExistsException(String.format(
596            "Can't make directory for path '%s' since it is a file.", f));
597
598      }
599    } catch (FileNotFoundException e) {
600      if(LOG.isDebugEnabled()) {
601        LOG.debug("Making dir '" + f + "' in S3");
602      }
603      String key = pathToKey(f) + FOLDER_SUFFIX;
604      store.storeEmptyFile(key);    
605    }
606    return true;
607  }
608
609  @Override
610  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
611    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
612    if (fs.isDirectory()) {
613      throw new FileNotFoundException("'" + f + "' is a directory");
614    }
615    LOG.info("Opening '" + f + "' for reading");
616    Path absolutePath = makeAbsolute(f);
617    String key = pathToKey(absolutePath);
618    return new FSDataInputStream(new BufferedFSInputStream(
619        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
620  }
621  
622  // rename() and delete() use this method to ensure that the parent directory
623  // of the source does not vanish.
624  private void createParent(Path path) throws IOException {
625    Path parent = path.getParent();
626    if (parent != null) {
627      String key = pathToKey(makeAbsolute(parent));
628      if (key.length() > 0) {
629          store.storeEmptyFile(key + FOLDER_SUFFIX);
630      }
631    }
632  }
633  
634    
635  @Override
636  public boolean rename(Path src, Path dst) throws IOException {
637
638    String srcKey = pathToKey(makeAbsolute(src));
639
640    if (srcKey.length() == 0) {
641      // Cannot rename root of file system
642      return false;
643    }
644
645    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
646
647    // Figure out the final destination
648    String dstKey;
649    try {
650      boolean dstIsFile = getFileStatus(dst).isFile();
651      if (dstIsFile) {
652        if(LOG.isDebugEnabled()) {
653          LOG.debug(debugPreamble +
654              "returning false as dst is an already existing file");
655        }
656        return false;
657      } else {
658        if(LOG.isDebugEnabled()) {
659          LOG.debug(debugPreamble + "using dst as output directory");
660        }
661        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
662      }
663    } catch (FileNotFoundException e) {
664      if(LOG.isDebugEnabled()) {
665        LOG.debug(debugPreamble + "using dst as output destination");
666      }
667      dstKey = pathToKey(makeAbsolute(dst));
668      try {
669        if (getFileStatus(dst.getParent()).isFile()) {
670          if(LOG.isDebugEnabled()) {
671            LOG.debug(debugPreamble +
672                "returning false as dst parent exists and is a file");
673          }
674          return false;
675        }
676      } catch (FileNotFoundException ex) {
677        if(LOG.isDebugEnabled()) {
678          LOG.debug(debugPreamble +
679              "returning false as dst parent does not exist");
680        }
681        return false;
682      }
683    }
684
685    boolean srcIsFile;
686    try {
687      srcIsFile = getFileStatus(src).isFile();
688    } catch (FileNotFoundException e) {
689      if(LOG.isDebugEnabled()) {
690        LOG.debug(debugPreamble + "returning false as src does not exist");
691      }
692      return false;
693    }
694    if (srcIsFile) {
695      if(LOG.isDebugEnabled()) {
696        LOG.debug(debugPreamble +
697            "src is file, so doing copy then delete in S3");
698      }
699      store.copy(srcKey, dstKey);
700      store.delete(srcKey);
701    } else {
702      if(LOG.isDebugEnabled()) {
703        LOG.debug(debugPreamble + "src is directory, so copying contents");
704      }
705      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
706
707      List<String> keysToDelete = new ArrayList<String>();
708      String priorLastKey = null;
709      do {
710        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
711        for (FileMetadata file : listing.getFiles()) {
712          keysToDelete.add(file.getKey());
713          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
714        }
715        priorLastKey = listing.getPriorLastKey();
716      } while (priorLastKey != null);
717
718      if(LOG.isDebugEnabled()) {
719        LOG.debug(debugPreamble +
720            "all files in src copied, now removing src files");
721      }
722      for (String key: keysToDelete) {
723        store.delete(key);
724      }
725
726      try {
727        store.delete(srcKey + FOLDER_SUFFIX);
728      } catch (FileNotFoundException e) {
729        //this is fine, we don't require a marker
730      }
731      if(LOG.isDebugEnabled()) {
732        LOG.debug(debugPreamble + "done");
733      }
734    }
735
736    return true;
737  }
738  
739  @Override
740  public long getDefaultBlockSize() {
741    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
742  }
743
744  /**
745   * Set the working directory to the given directory.
746   */
747  @Override
748  public void setWorkingDirectory(Path newDir) {
749    workingDir = newDir;
750  }
751  
752  @Override
753  public Path getWorkingDirectory() {
754    return workingDir;
755  }
756
757  @Override
758  public String getCanonicalServiceName() {
759    // Does not support Token
760    return null;
761  }
762}