View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.FileNotFoundException;
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.List;
29  
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataInputStream;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.testclassification.MediumTests;
37  import org.apache.hadoop.hbase.util.FSUtils;
38  import org.apache.hadoop.hdfs.MiniDFSCluster;
39  import org.junit.Test;
40  import org.junit.experimental.categories.Category;
41  
42  /**
43   * Test that FileLink switches between alternate locations
44   * when the current location moves or gets deleted.
45   */
46  @Category(MediumTests.class)
47  public class TestFileLink {
48    /**
49     * Test, on HDFS, that the FileLink is still readable
50     * even when the current file gets renamed.
51     */
52    @Test
53    public void testHDFSLinkReadDuringRename() throws Exception {
54      HBaseTestingUtility testUtil = new HBaseTestingUtility();
55      Configuration conf = testUtil.getConfiguration();
56      conf.setInt("dfs.blocksize", 1024 * 1024);
57      conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
58  
59      testUtil.startMiniDFSCluster(1);
60      MiniDFSCluster cluster = testUtil.getDFSCluster();
61      FileSystem fs = cluster.getFileSystem();
62      assertEquals("hdfs", fs.getUri().getScheme());
63  
64      try {
65        testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
66      } finally {
67        testUtil.shutdownMiniCluster();
68      }
69    }
70  
71    /**
72     * Test, on a local filesystem, that the FileLink is still readable
73     * even when the current file gets renamed.
74     */
75    @Test
76    public void testLocalLinkReadDuringRename() throws IOException {
77      HBaseTestingUtility testUtil = new HBaseTestingUtility();
78      FileSystem fs = testUtil.getTestFileSystem();
79      assertEquals("file", fs.getUri().getScheme());
80      testLinkReadDuringRename(fs, testUtil.getDataTestDir());
81    }
82  
83    /**
84     * Test that link is still readable even when the current file gets renamed.
85     */
86    private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
87      Path originalPath = new Path(rootDir, "test.file");
88      Path archivedPath = new Path(rootDir, "archived.file");
89  
90      writeSomeData(fs, originalPath, 256 << 20, (byte)2);
91  
92      List<Path> files = new ArrayList<Path>();
93      files.add(originalPath);
94      files.add(archivedPath);
95  
96      FileLink link = new FileLink(files);
97      FSDataInputStream in = link.open(fs);
98      try {
99        byte[] data = new byte[8192];
100       long size = 0;
101 
102       // Read from origin
103       int n = in.read(data);
104       dataVerify(data, n, (byte)2);
105       size += n;
106 
107       if (FSUtils.WINDOWS) {
108         in.close();
109       }
110 
111       // Move origin to archive
112       assertFalse(fs.exists(archivedPath));
113       fs.rename(originalPath, archivedPath);
114       assertFalse(fs.exists(originalPath));
115       assertTrue(fs.exists(archivedPath));
116 
117       if (FSUtils.WINDOWS) {
118         in = link.open(fs); // re-read from beginning
119         in.read(data);
120       }
121 
122       // Try to read to the end
123       while ((n = in.read(data)) > 0) {
124         dataVerify(data, n, (byte)2);
125         size += n;
126       }
127 
128       assertEquals(256 << 20, size);
129     } finally {
130       in.close();
131       if (fs.exists(originalPath)) fs.delete(originalPath);
132       if (fs.exists(archivedPath)) fs.delete(archivedPath);
133     }
134   }
135 
136   /**
137    * Test that link is still readable even when the current file gets deleted.
138    *
139    * NOTE: This test is valid only on HDFS.
140    * When a file is deleted from a local file-system, it is simply 'unlinked'.
141    * The inode, which contains the file's data, is not deleted until all
142    * processes have finished with it.
143    * In HDFS when the request exceed the cached block locations,
144    * a query to the namenode is performed, using the filename,
145    * and the deleted file doesn't exists anymore (FileNotFoundException).
146    */
147   @Test
148   public void testHDFSLinkReadDuringDelete() throws Exception {
149     HBaseTestingUtility testUtil = new HBaseTestingUtility();
150     Configuration conf = testUtil.getConfiguration();
151     conf.setInt("dfs.blocksize", 1024 * 1024);
152     conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
153 
154     testUtil.startMiniDFSCluster(1);
155     MiniDFSCluster cluster = testUtil.getDFSCluster();
156     FileSystem fs = cluster.getFileSystem();
157     assertEquals("hdfs", fs.getUri().getScheme());
158 
159     try {
160       List<Path> files = new ArrayList<Path>();
161       for (int i = 0; i < 3; i++) {
162         Path path = new Path(String.format("test-data-%d", i));
163         writeSomeData(fs, path, 1 << 20, (byte)i);
164         files.add(path);
165       }
166 
167       FileLink link = new FileLink(files);
168       FSDataInputStream in = link.open(fs);
169       try {
170         byte[] data = new byte[8192];
171         int n;
172 
173         // Switch to file 1
174         n = in.read(data);
175         dataVerify(data, n, (byte)0);
176         fs.delete(files.get(0));
177         skipBuffer(in, (byte)0);
178 
179         // Switch to file 2
180         n = in.read(data);
181         dataVerify(data, n, (byte)1);
182         fs.delete(files.get(1));
183         skipBuffer(in, (byte)1);
184 
185         // Switch to file 3
186         n = in.read(data);
187         dataVerify(data, n, (byte)2);
188         fs.delete(files.get(2));
189         skipBuffer(in, (byte)2);
190 
191         // No more files available
192         try {
193           n = in.read(data);
194           assert(n <= 0);
195         } catch (FileNotFoundException e) {
196           assertTrue(true);
197         }
198       } finally {
199         in.close();
200       }
201     } finally {
202       testUtil.shutdownMiniCluster();
203     }
204   }
205 
206   /**
207    * Write up to 'size' bytes with value 'v' into a new file called 'path'.
208    */
209   private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
210     byte[] data = new byte[4096];
211     for (int i = 0; i < data.length; i++) {
212       data[i] = v;
213     }
214 
215     FSDataOutputStream stream = fs.create(path);
216     try {
217       long written = 0;
218       while (written < size) {
219         stream.write(data, 0, data.length);
220         written += data.length;
221       }
222     } finally {
223       stream.close();
224     }
225   }
226 
227   /**
228    * Verify that all bytes in 'data' have 'v' as value.
229    */
230   private static void dataVerify(byte[] data, int n, byte v) {
231     for (int i = 0; i < n; ++i) {
232       assertEquals(v, data[i]);
233     }
234   }
235 
236   private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
237     byte[] data = new byte[8192];
238     try {
239       int n;
240       while ((n = in.read(data)) == data.length) {
241         for (int i = 0; i < data.length; ++i) {
242           if (data[i] != v)
243             throw new Exception("File changed");
244         }
245       }
246     } catch (Exception e) {
247     }
248   }
249 }