1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.fs;
20
21
22 import java.lang.reflect.Field;
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.net.BindException;
26 import java.net.ServerSocket;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.commons.logging.impl.Log4JLogger;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.BlockLocation;
33 import org.apache.hadoop.fs.FSDataInputStream;
34 import org.apache.hadoop.fs.FSDataOutputStream;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.testclassification.LargeTests;
41 import org.apache.hadoop.hbase.MiniHBaseCluster;
42 import org.apache.hadoop.hbase.client.HTable;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.regionserver.HRegionServer;
45 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
46 import org.apache.hadoop.hbase.util.FSUtils;
47 import org.apache.hadoop.hdfs.DFSClient;
48 import org.apache.hadoop.hdfs.DistributedFileSystem;
49 import org.apache.hadoop.hdfs.MiniDFSCluster;
50 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
51 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
52 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
53 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
54 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
55 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
56 import org.apache.hadoop.hdfs.server.datanode.DataNode;
57 import org.apache.log4j.Level;
58 import org.junit.After;
59 import org.junit.Assert;
60 import org.junit.Before;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63
64
65
66
67 @Category(LargeTests.class)
68 public class TestBlockReorder {
69 private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
70
71 static {
72 ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
73 ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL);
74 }
75
76 private Configuration conf;
77 private MiniDFSCluster cluster;
78 private HBaseTestingUtility htu;
79 private DistributedFileSystem dfs;
80 private static final String host1 = "host1";
81 private static final String host2 = "host2";
82 private static final String host3 = "host3";
83
84 @Before
85 public void setUp() throws Exception {
86 htu = new HBaseTestingUtility();
87 htu.getConfiguration().setInt("dfs.block.size", 1024);
88 htu.getConfiguration().setBoolean("dfs.support.append", true);
89 htu.getConfiguration().setInt("dfs.replication", 3);
90 htu.startMiniDFSCluster(3,
91 new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
92
93 conf = htu.getConfiguration();
94 cluster = htu.getDFSCluster();
95 dfs = (DistributedFileSystem) FileSystem.get(conf);
96 }
97
98 @After
99 public void tearDownAfterClass() throws Exception {
100 htu.shutdownMiniCluster();
101 }
102
103
104
105
106 @Test
107 public void testBlockLocationReorder() throws Exception {
108 Path p = new Path("hello");
109
110 Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
111 final int repCount = 2;
112
113
114 FSDataOutputStream fop = dfs.create(p, (short) repCount);
115 final double toWrite = 875.5613;
116 fop.writeDouble(toWrite);
117 fop.close();
118
119
120 long start = System.currentTimeMillis();
121 FSDataInputStream fin = dfs.open(p);
122 Assert.assertTrue(toWrite == fin.readDouble());
123 long end = System.currentTimeMillis();
124 LOG.info("readtime= " + (end - start));
125 fin.close();
126 Assert.assertTrue((end - start) < 30 * 1000);
127
128
129
130 FileStatus f = dfs.getFileStatus(p);
131 BlockLocation[] lbs;
132 do {
133 lbs = dfs.getFileBlockLocations(f, 0, 1);
134 } while (lbs.length != 1 && lbs[0].getLength() != repCount);
135 final String name = lbs[0].getNames()[0];
136 Assert.assertTrue(name.indexOf(':') > 0);
137 String portS = name.substring(name.indexOf(':') + 1);
138 final int port = Integer.parseInt(portS);
139 LOG.info("port= " + port);
140 int ipcPort = -1;
141
142
143
144 boolean ok = false;
145 final String lookup = lbs[0].getHosts()[0];
146 StringBuilder sb = new StringBuilder();
147 for (DataNode dn : cluster.getDataNodes()) {
148 final String dnName = getHostName(dn);
149 sb.append(dnName).append(' ');
150 if (lookup.equals(dnName)) {
151 ok = true;
152 LOG.info("killing datanode " + name + " / " + lookup);
153 ipcPort = dn.ipcServer.getListenerAddress().getPort();
154 dn.shutdown();
155 LOG.info("killed datanode " + name + " / " + lookup);
156 break;
157 }
158 }
159 Assert.assertTrue(
160 "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
161 LOG.info("ipc port= " + ipcPort);
162
163
164 Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
165 new HFileSystem.ReorderBlocks() {
166 @Override
167 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
168 for (LocatedBlock lb : lbs.getLocatedBlocks()) {
169 if (lb.getLocations().length > 1) {
170 DatanodeInfo[] infos = lb.getLocations();
171 if (infos[0].getHostName().equals(lookup)) {
172 LOG.info("HFileSystem bad host, inverting");
173 DatanodeInfo tmp = infos[0];
174 infos[0] = infos[1];
175 infos[1] = tmp;
176 }
177 }
178 }
179 }
180 }));
181
182
183 final int retries = 10;
184 ServerSocket ss = null;
185 ServerSocket ssI;
186 try {
187 ss = new ServerSocket(port);
188 ssI = new ServerSocket(ipcPort);
189 } catch (BindException be) {
190 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
191 ", this means that the datanode has not closed the socket or" +
192 " someone else took it. It may happen, skipping this test for this time.", be);
193 if (ss != null) {
194 ss.close();
195 }
196 return;
197 }
198
199
200
201 for (int i = 0; i < retries; i++) {
202 start = System.currentTimeMillis();
203
204 fin = dfs.open(p);
205 Assert.assertTrue(toWrite == fin.readDouble());
206 fin.close();
207 end = System.currentTimeMillis();
208 LOG.info("HFileSystem readtime= " + (end - start));
209 Assert.assertFalse("We took too much time to read", (end - start) > 60000);
210 }
211
212 ss.close();
213 ssI.close();
214 }
215
216
217
218
219 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
220 Method m;
221 try {
222 m = DataNode.class.getMethod("getDisplayName");
223 } catch (NoSuchMethodException e) {
224 try {
225 m = DataNode.class.getMethod("getHostName");
226 } catch (NoSuchMethodException e1) {
227 throw new RuntimeException(e1);
228 }
229 }
230
231 String res = (String) m.invoke(dn);
232 if (res.contains(":")) {
233 return res.split(":")[0];
234 } else {
235 return res;
236 }
237 }
238
239
240
241
242 @Test()
243 public void testHBaseCluster() throws Exception {
244 byte[] sb = "sb".getBytes();
245 htu.startMiniZKCluster();
246
247 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
248 hbm.waitForActiveAndReadyMaster();
249 hbm.getRegionServer(0).waitForServerOnline();
250
251
252
253 String host4 = hbm.getRegionServer(0).getServerName().getHostname();
254 LOG.info("Starting a new datanode with the name=" + host4);
255 cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
256 cluster.waitClusterUp();
257
258 final int repCount = 3;
259 HRegionServer targetRs = hbm.getRegionServer(0);
260
261
262 conf = targetRs.getConfiguration();
263 HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
264 HTable h = htu.createTable("table".getBytes(), sb);
265
266
267
268
269
270
271
272 String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
273 "/" + targetRs.getServerName().toString()).toUri().getPath();
274
275 DistributedFileSystem mdfs = (DistributedFileSystem)
276 hbm.getMaster().getMasterFileSystem().getFileSystem();
277
278
279 int nbTest = 0;
280 while (nbTest < 10) {
281 htu.getHBaseAdmin().rollHLogWriter(targetRs.getServerName().toString());
282
283
284 Thread.sleep(100);
285
286
287 Put p = new Put(sb);
288 p.add(sb, sb, sb);
289 h.put(p);
290
291 DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
292 HdfsFileStatus[] hfs = dl.getPartialListing();
293
294
295 Assert.assertTrue(hfs.length >= 1);
296 for (HdfsFileStatus hf : hfs) {
297 LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
298 String logFile = rootDir + "/" + hf.getLocalName();
299 FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
300
301 LOG.info("Checking log file: " + logFile);
302
303
304
305
306 BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
307 if (bls.length > 0) {
308 BlockLocation bl = bls[0];
309
310 LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
311 for (int i = 0; i < bl.getHosts().length - 1; i++) {
312 LOG.info(bl.getHosts()[i] + " " + logFile);
313 Assert.assertNotSame(bl.getHosts()[i], host4);
314 }
315 String last = bl.getHosts()[bl.getHosts().length - 1];
316 LOG.info(last + " " + logFile);
317 if (host4.equals(last)) {
318 nbTest++;
319 LOG.info(logFile + " is on the new datanode and is ok");
320 if (bl.getHosts().length == 3) {
321
322
323 testFromDFS(dfs, logFile, repCount, host4);
324
325
326 testFromDFS(mdfs, logFile, repCount, host4);
327 }
328 }
329 }
330 }
331 }
332 }
333
334 private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
335 throws Exception {
336
337 for (int i = 0; i < 10; i++) {
338 LocatedBlocks l;
339
340 final long max = System.currentTimeMillis() + 10000;
341 boolean done;
342 do {
343 Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
344 l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
345 Assert.assertNotNull("Can't get block locations for " + src, l);
346 Assert.assertNotNull(l.getLocatedBlocks());
347 Assert.assertTrue(l.getLocatedBlocks().size() > 0);
348
349 done = true;
350 for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
351 done = (l.get(y).getLocations().length == repCount);
352 }
353 } while (!done);
354
355 for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
356 Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
357 }
358 }
359 }
360
361 private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
362 Field nf = DFSClient.class.getDeclaredField("namenode");
363 nf.setAccessible(true);
364 return (ClientProtocol) nf.get(dfsc);
365 }
366
367
368
369
370 @Test
371 public void testBlockLocation() throws Exception {
372
373 htu.startMiniZKCluster();
374 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
375 conf = hbm.getConfiguration();
376
377
378
379 final String fileName = "/helloWorld";
380 Path p = new Path(fileName);
381
382 final int repCount = 3;
383 Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
384
385
386 FSDataOutputStream fop = dfs.create(p, (short) repCount);
387 final double toWrite = 875.5613;
388 fop.writeDouble(toWrite);
389 fop.close();
390
391 for (int i=0; i<10; i++){
392
393 LocatedBlocks l;
394 final long max = System.currentTimeMillis() + 10000;
395 do {
396 l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
397 Assert.assertNotNull(l.getLocatedBlocks());
398 Assert.assertEquals(l.getLocatedBlocks().size(), 1);
399 Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
400 System.currentTimeMillis() < max);
401 } while (l.get(0).getLocations().length != repCount);
402
403
404 Object originalList[] = l.getLocatedBlocks().toArray();
405 HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
406 lrb.reorderBlocks(conf, l, fileName);
407 Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
408
409
410 Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
411 Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
412 String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
413 HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
414
415
416 Assert.assertNotNull("log= " + pseudoLogFile,
417 HLogUtil.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile));
418
419
420 lrb.reorderBlocks(conf, l, pseudoLogFile);
421 Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
422
423
424 lrb.reorderBlocks(conf, l, pseudoLogFile);
425 Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
426 }
427 }
428
429 }