View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertNotNull;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.lang.ref.SoftReference;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.FilterFileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.fs.PositionedReadable;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.testclassification.MediumTests;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.fs.HFileSystem;
50  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
51  import org.apache.hadoop.hbase.io.hfile.HFileContext;
52  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
53  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.junit.Assume;
56  import org.junit.Test;
57  import org.junit.experimental.categories.Category;
58  
59  /**
60   * Test cases that ensure that file system level errors are bubbled up
61   * appropriately to clients, rather than swallowed.
62   */
63  @Category(MediumTests.class)
64  public class TestFSErrorsExposed {
65    private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class);
66  
67    HBaseTestingUtility util = new HBaseTestingUtility();
68  
69    /**
70     * Injects errors into the pread calls of an on-disk file, and makes
71     * sure those bubble up to the HFile scanner
72     */
73    @Test
74    public void testHFileScannerThrowsErrors() throws IOException {
75      Path hfilePath = new Path(new Path(
76          util.getDataTestDir("internalScannerExposesErrors"),
77          "regionname"), "familyname");
78      HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
79      FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
80      FileSystem fs = new HFileSystem(faultyfs);
81      CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
82      HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
83      StoreFile.Writer writer = new StoreFile.WriterBuilder(
84          util.getConfiguration(), cacheConf, hfs)
85              .withOutputDir(hfilePath)
86              .withFileContext(meta)
87              .build();
88      TestStoreFile.writeStoreFile(
89          writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
90  
91      StoreFile sf = new StoreFile(fs, writer.getPath(),
92        util.getConfiguration(), cacheConf, BloomType.NONE);
93  
94      StoreFile.Reader reader = sf.createReader();
95      HFileScanner scanner = reader.getScanner(false, true);
96  
97      FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
98      assertNotNull(inStream);
99  
100     scanner.seekTo();
101     // Do at least one successful read
102     assertTrue(scanner.next());
103 
104     faultyfs.startFaults();
105 
106     try {
107       int scanned=0;
108       while (scanner.next()) {
109         scanned++;
110       }
111       fail("Scanner didn't throw after faults injected");
112     } catch (IOException ioe) {
113       LOG.info("Got expected exception", ioe);
114       assertTrue(ioe.getMessage().contains("Fault"));
115     }
116     reader.close(true); // end of test so evictOnClose
117   }
118 
119   /**
120    * Injects errors into the pread calls of an on-disk file, and makes
121    * sure those bubble up to the StoreFileScanner
122    */
123   @Test
124   public void testStoreFileScannerThrowsErrors() throws IOException {
125     Path hfilePath = new Path(new Path(
126         util.getDataTestDir("internalScannerExposesErrors"),
127         "regionname"), "familyname");
128     HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
129     FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
130     HFileSystem fs = new HFileSystem(faultyfs);
131     CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
132     HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
133     StoreFile.Writer writer = new StoreFile.WriterBuilder(
134         util.getConfiguration(), cacheConf, hfs)
135             .withOutputDir(hfilePath)
136             .withFileContext(meta)
137             .build();
138     TestStoreFile.writeStoreFile(
139         writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
140 
141     StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(),
142       cacheConf, BloomType.NONE);
143 
144     List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
145         Collections.singletonList(sf), false, true, false,
146         // 0 is passed as readpoint because this test operates on StoreFile directly
147         0);
148     KeyValueScanner scanner = scanners.get(0);
149 
150     FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
151     assertNotNull(inStream);
152 
153     scanner.seek(KeyValue.LOWESTKEY);
154     // Do at least one successful read
155     assertNotNull(scanner.next());
156     faultyfs.startFaults();
157 
158     try {
159       int scanned=0;
160       while (scanner.next() != null) {
161         scanned++;
162       }
163       fail("Scanner didn't throw after faults injected");
164     } catch (IOException ioe) {
165       LOG.info("Got expected exception", ioe);
166       assertTrue(ioe.getMessage().contains("Could not iterate"));
167     }
168     scanner.close();
169   }
170 
171   /**
172    * Cluster test which starts a region server with a region, then
173    * removes the data from HDFS underneath it, and ensures that
174    * errors are bubbled to the client.
175    */
176   @Test(timeout=5 * 60 * 1000)
177   public void testFullSystemBubblesFSErrors() throws Exception {
178     // We won't have an error if the datanode is not there if we use short circuit
179     //  it's a known 'feature'.
180     Assume.assumeTrue(!util.isReadShortCircuitOn());
181 
182     try {
183       // We set it not to run or it will trigger server shutdown while sync'ing
184       // because all the datanodes are bad
185       util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
186 
187       util.startMiniCluster(1);
188       byte[] tableName = Bytes.toBytes("table");
189       byte[] fam = Bytes.toBytes("fam");
190 
191       HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
192       HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
193       desc.addFamily(new HColumnDescriptor(fam)
194           .setMaxVersions(1)
195           .setBlockCacheEnabled(false)
196       );
197       admin.createTable(desc);
198       // Make it fail faster.
199       util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
200       // Make a new Configuration so it makes a new connection that has the
201       // above configuration on it; else we use the old one w/ 10 as default.
202       HTable table = new HTable(new Configuration(util.getConfiguration()), tableName);
203 
204       // Load some data
205       util.loadTable(table, fam, false);
206       table.flushCommits();
207       util.flush();
208       util.countRows(table);
209 
210       // Kill the DFS cluster
211       util.getDFSCluster().shutdownDataNodes();
212 
213       try {
214         util.countRows(table);
215         fail("Did not fail to count after removing data");
216       } catch (Exception e) {
217         LOG.info("Got expected error", e);
218         assertTrue(e.getMessage().contains("Could not seek"));
219       }
220 
221       // Restart data nodes so that HBase can shut down cleanly.
222       util.getDFSCluster().restartDataNodes();
223 
224     } finally {
225       util.getMiniHBaseCluster().killAll();
226       util.shutdownMiniCluster();
227     }
228   }
229 
230   static class FaultyFileSystem extends FilterFileSystem {
231     List<SoftReference<FaultyInputStream>> inStreams =
232       new ArrayList<SoftReference<FaultyInputStream>>();
233 
234     public FaultyFileSystem(FileSystem testFileSystem) {
235       super(testFileSystem);
236     }
237 
238     @Override
239     public FSDataInputStream open(Path p, int bufferSize) throws IOException  {
240       FSDataInputStream orig = fs.open(p, bufferSize);
241       FaultyInputStream faulty = new FaultyInputStream(orig);
242       inStreams.add(new SoftReference<FaultyInputStream>(faulty));
243       return faulty;
244     }
245 
246     /**
247      * Starts to simulate faults on all streams opened so far
248      */
249     public void startFaults() {
250       for (SoftReference<FaultyInputStream> is: inStreams) {
251         is.get().startFaults();
252       }
253     }
254   }
255 
256   static class FaultyInputStream extends FSDataInputStream {
257     boolean faultsStarted = false;
258 
259     public FaultyInputStream(InputStream in) throws IOException {
260       super(in);
261     }
262 
263     public void startFaults() {
264       faultsStarted = true;
265     }
266 
267     public int read(long position, byte[] buffer, int offset, int length)
268       throws IOException {
269       injectFault();
270       return ((PositionedReadable)in).read(position, buffer, offset, length);
271     }
272 
273     private void injectFault() throws IOException {
274       if (faultsStarted) {
275         throw new IOException("Fault injected");
276       }
277     }
278   }
279 
280 
281 
282 }