1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.lang.reflect.Method;
25 import java.net.BindException;
26 import java.util.Comparator;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FSDataInputStream;
36 import org.apache.hadoop.fs.FSDataOutputStream;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.*;
41 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
42 import org.apache.hadoop.hbase.testclassification.LargeTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.FSUtils;
45 import org.apache.hadoop.hbase.util.Threads;
46 import org.apache.hadoop.hbase.Coprocessor;
47 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
48 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
49 import org.apache.hadoop.hdfs.DistributedFileSystem;
50 import org.apache.hadoop.hdfs.MiniDFSCluster;
51 import org.apache.hadoop.hdfs.protocol.FSConstants;
52 import org.junit.After;
53 import org.junit.AfterClass;
54 import org.junit.Assert;
55 import org.junit.Before;
56 import org.junit.BeforeClass;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59
60
61 @Category(LargeTests.class)
62 @SuppressWarnings("deprecation")
63 public class TestHLog {
64 private static final Log LOG = LogFactory.getLog(TestHLog.class);
65 {
66
67
68
69
70
71
72
73
74 }
75
76 private static Configuration conf;
77 private static FileSystem fs;
78 private static Path dir;
79 private static MiniDFSCluster cluster;
80 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
81 private static Path hbaseDir;
82 private static Path oldLogDir;
83
84 @Before
85 public void setUp() throws Exception {
86
87 FileStatus[] entries = fs.listStatus(new Path("/"));
88 for (FileStatus dir : entries) {
89 fs.delete(dir.getPath(), true);
90 }
91
92 }
93
94 @After
95 public void tearDown() throws Exception {
96 }
97
98 @BeforeClass
99 public static void setUpBeforeClass() throws Exception {
100
101 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
102
103 TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
104 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
105
106 TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
107 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
108 TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000);
109
110 TEST_UTIL.getConfiguration().setInt("ipc.client.connect.max.retries", 1);
111 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
112 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
113 TEST_UTIL.getConfiguration().setInt("ipc.client.connection.maxidletime", 500);
114 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
115 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
116 SampleRegionWALObserver.class.getName());
117 TEST_UTIL.startMiniDFSCluster(3);
118
119 conf = TEST_UTIL.getConfiguration();
120 cluster = TEST_UTIL.getDFSCluster();
121 fs = cluster.getFileSystem();
122
123 hbaseDir = TEST_UTIL.createRootDir();
124 oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
125 dir = new Path(hbaseDir, getName());
126 }
127 @AfterClass
128 public static void tearDownAfterClass() throws Exception {
129 TEST_UTIL.shutdownMiniCluster();
130 }
131
132 private static String getName() {
133
134 return "TestHLog";
135 }
136
137
138
139
140
141 @Test
142 public void testConcurrentWrites() throws Exception {
143
144
145 int errCode = HLogPerformanceEvaluation.
146 innerMain(new Configuration(TEST_UTIL.getConfiguration()),
147 new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
148 assertEquals(0, errCode);
149 }
150
151
152
153
154
155
156 @Test
157 public void testSplit() throws IOException {
158
159 final TableName tableName =
160 TableName.valueOf(getName());
161 final byte [] rowName = tableName.getName();
162 Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
163 HLog log = HLogFactory.createHLog(fs, hbaseDir,
164 HConstants.HREGION_LOGDIR_NAME, conf);
165 final int howmany = 3;
166 HRegionInfo[] infos = new HRegionInfo[3];
167 Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
168 fs.mkdirs(tabledir);
169 for(int i = 0; i < howmany; i++) {
170 infos[i] = new HRegionInfo(tableName,
171 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
172 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
173 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
174 }
175 HTableDescriptor htd = new HTableDescriptor(tableName);
176 htd.addFamily(new HColumnDescriptor("column"));
177
178
179 final AtomicLong sequenceId = new AtomicLong(1);
180 try {
181 for (int ii = 0; ii < howmany; ii++) {
182 for (int i = 0; i < howmany; i++) {
183
184 for (int j = 0; j < howmany; j++) {
185 WALEdit edit = new WALEdit();
186 byte [] family = Bytes.toBytes("column");
187 byte [] qualifier = Bytes.toBytes(Integer.toString(j));
188 byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
189 edit.add(new KeyValue(rowName, family, qualifier,
190 System.currentTimeMillis(), column));
191 LOG.info("Region " + i + ": " + edit);
192 log.append(infos[i], tableName, edit,
193 System.currentTimeMillis(), htd, sequenceId);
194 }
195 }
196 log.rollWriter();
197 }
198 log.close();
199 List<Path> splits = HLogSplitter.split(
200 hbaseDir, logdir, oldLogDir, fs, conf);
201 verifySplits(splits, howmany);
202 log = null;
203 } finally {
204 if (log != null) {
205 log.closeAndDelete();
206 }
207 }
208 }
209
210
211
212
213
214 @Test
215 public void Broken_testSync() throws Exception {
216 TableName tableName =
217 TableName.valueOf(getName());
218
219 Path p = new Path(dir, getName() + ".fsdos");
220 FSDataOutputStream out = fs.create(p);
221 out.write(tableName.getName());
222 Method syncMethod = null;
223 try {
224 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
225 } catch (NoSuchMethodException e) {
226 try {
227 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
228 } catch (NoSuchMethodException ex) {
229 fail("This version of Hadoop supports neither Syncable.sync() " +
230 "nor Syncable.hflush().");
231 }
232 }
233 syncMethod.invoke(out, new Object[]{});
234 FSDataInputStream in = fs.open(p);
235 assertTrue(in.available() > 0);
236 byte [] buffer = new byte [1024];
237 int read = in.read(buffer);
238 assertEquals(tableName.getName().length, read);
239 out.close();
240 in.close();
241
242 HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf);
243 final AtomicLong sequenceId = new AtomicLong(1);
244 final int total = 20;
245 HLog.Reader reader = null;
246
247 try {
248 HRegionInfo info = new HRegionInfo(tableName,
249 null,null, false);
250 HTableDescriptor htd = new HTableDescriptor();
251 htd.addFamily(new HColumnDescriptor(tableName.getName()));
252
253 for (int i = 0; i < total; i++) {
254 WALEdit kvs = new WALEdit();
255 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
256 wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
257 }
258
259
260 wal.sync();
261
262 Path walPath = ((FSHLog) wal).computeFilename();
263 reader = HLogFactory.createReader(fs, walPath, conf);
264 int count = 0;
265 HLog.Entry entry = new HLog.Entry();
266 while ((entry = reader.next(entry)) != null) count++;
267 assertEquals(total, count);
268 reader.close();
269
270
271 for (int i = 0; i < total; i++) {
272 WALEdit kvs = new WALEdit();
273 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
274 wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
275 }
276 reader = HLogFactory.createReader(fs, walPath, conf);
277 count = 0;
278 while((entry = reader.next(entry)) != null) count++;
279 assertTrue(count >= total);
280 reader.close();
281
282 wal.sync();
283 reader = HLogFactory.createReader(fs, walPath, conf);
284 count = 0;
285 while((entry = reader.next(entry)) != null) count++;
286 assertEquals(total * 2, count);
287
288
289 final byte [] value = new byte[1025 * 1024];
290 for (int i = 0; i < total; i++) {
291 WALEdit kvs = new WALEdit();
292 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
293 wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
294 }
295
296 wal.sync();
297 reader = HLogFactory.createReader(fs, walPath, conf);
298 count = 0;
299 while((entry = reader.next(entry)) != null) count++;
300 assertEquals(total * 3, count);
301 reader.close();
302
303 wal.close();
304 reader = HLogFactory.createReader(fs, walPath, conf);
305 count = 0;
306 while((entry = reader.next(entry)) != null) count++;
307 assertEquals(total * 3, count);
308 reader.close();
309 } finally {
310 if (wal != null) wal.closeAndDelete();
311 if (reader != null) reader.close();
312 }
313 }
314
315 private void verifySplits(List<Path> splits, final int howmany)
316 throws IOException {
317 assertEquals(howmany * howmany, splits.size());
318 for (int i = 0; i < splits.size(); i++) {
319 LOG.info("Verifying=" + splits.get(i));
320 HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
321 try {
322 int count = 0;
323 String previousRegion = null;
324 long seqno = -1;
325 HLog.Entry entry = new HLog.Entry();
326 while((entry = reader.next(entry)) != null) {
327 HLogKey key = entry.getKey();
328 String region = Bytes.toString(key.getEncodedRegionName());
329
330 if (previousRegion != null) {
331 assertEquals(previousRegion, region);
332 }
333 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
334 assertTrue(seqno < key.getLogSeqNum());
335 seqno = key.getLogSeqNum();
336 previousRegion = region;
337 count++;
338 }
339 assertEquals(howmany, count);
340 } finally {
341 reader.close();
342 }
343 }
344 }
345
346
347
348
349
350
351
352
353
354
355 @Test (timeout=300000)
356 public void testAppendClose() throws Exception {
357 TableName tableName =
358 TableName.valueOf(getName());
359 HRegionInfo regioninfo = new HRegionInfo(tableName,
360 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
361
362 HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
363 "hlogdir_archive", conf);
364 final AtomicLong sequenceId = new AtomicLong(1);
365 final int total = 20;
366
367 HTableDescriptor htd = new HTableDescriptor();
368 htd.addFamily(new HColumnDescriptor(tableName.getName()));
369
370 for (int i = 0; i < total; i++) {
371 WALEdit kvs = new WALEdit();
372 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
373 wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
374 }
375
376 wal.sync();
377 int namenodePort = cluster.getNameNodePort();
378 final Path walPath = ((FSHLog) wal).computeFilename();
379
380
381
382 try {
383 DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
384 dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
385 TEST_UTIL.shutdownMiniDFSCluster();
386 try {
387
388
389 wal.close();
390 } catch (IOException e) {
391 LOG.info(e);
392 }
393 fs.close();
394 LOG.info("STOPPED first instance of the cluster");
395 } finally {
396
397 while (cluster.isClusterUp()){
398 LOG.error("Waiting for cluster to go down");
399 Thread.sleep(1000);
400 }
401 assertFalse(cluster.isClusterUp());
402 cluster = null;
403 for (int i = 0; i < 100; i++) {
404 try {
405 cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort);
406 break;
407 } catch (BindException e) {
408 LOG.info("Sleeping. BindException bringing up new cluster");
409 Threads.sleep(1000);
410 }
411 }
412 cluster.waitActive();
413 fs = cluster.getFileSystem();
414 LOG.info("STARTED second instance.");
415 }
416
417
418
419 Method setLeasePeriod = cluster.getClass()
420 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
421 setLeasePeriod.setAccessible(true);
422 setLeasePeriod.invoke(cluster, 1000L, 1000L);
423 try {
424 Thread.sleep(1000);
425 } catch (InterruptedException e) {
426 LOG.info(e);
427 }
428
429
430 final FileSystem recoveredFs = fs;
431 final Configuration rlConf = conf;
432
433 class RecoverLogThread extends Thread {
434 public Exception exception = null;
435 public void run() {
436 try {
437 FSUtils.getInstance(fs, rlConf)
438 .recoverFileLease(recoveredFs, walPath, rlConf, null);
439 } catch (IOException e) {
440 exception = e;
441 }
442 }
443 }
444
445 RecoverLogThread t = new RecoverLogThread();
446 t.start();
447
448 t.join(60 * 1000);
449 if(t.isAlive()) {
450 t.interrupt();
451 throw new Exception("Timed out waiting for HLog.recoverLog()");
452 }
453
454 if (t.exception != null)
455 throw t.exception;
456
457
458 HLog.Reader reader = HLogFactory.createReader(fs, walPath, conf);
459 int count = 0;
460 HLog.Entry entry = new HLog.Entry();
461 while (reader.next(entry) != null) {
462 count++;
463 assertTrue("Should be one KeyValue per WALEdit",
464 entry.getEdit().getKeyValues().size() == 1);
465 }
466 assertEquals(total, count);
467 reader.close();
468
469
470 setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
471 }
472
473
474
475
476
477 @Test
478 public void testEditAdd() throws IOException {
479 final int COL_COUNT = 10;
480 final TableName tableName =
481 TableName.valueOf("tablename");
482 final byte [] row = Bytes.toBytes("row");
483 HLog.Reader reader = null;
484 HLog log = null;
485 try {
486 log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
487 final AtomicLong sequenceId = new AtomicLong(1);
488
489
490
491 long timestamp = System.currentTimeMillis();
492 WALEdit cols = new WALEdit();
493 for (int i = 0; i < COL_COUNT; i++) {
494 cols.add(new KeyValue(row, Bytes.toBytes("column"),
495 Bytes.toBytes(Integer.toString(i)),
496 timestamp, new byte[] { (byte)(i + '0') }));
497 }
498 HRegionInfo info = new HRegionInfo(tableName,
499 row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
500 HTableDescriptor htd = new HTableDescriptor();
501 htd.addFamily(new HColumnDescriptor("column"));
502
503 log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
504 log.startCacheFlush(info.getEncodedNameAsBytes());
505 log.completeCacheFlush(info.getEncodedNameAsBytes());
506 log.close();
507 Path filename = ((FSHLog) log).computeFilename();
508 log = null;
509
510 reader = HLogFactory.createReader(fs, filename, conf);
511
512
513 for (int i = 0; i < 1; i++) {
514 HLog.Entry entry = reader.next(null);
515 if (entry == null) break;
516 HLogKey key = entry.getKey();
517 WALEdit val = entry.getEdit();
518 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
519 assertTrue(tableName.equals(key.getTablename()));
520 KeyValue kv = val.getKeyValues().get(0);
521 assertTrue(Bytes.equals(row, kv.getRow()));
522 assertEquals((byte)(i + '0'), kv.getValue()[0]);
523 System.out.println(key + " " + val);
524 }
525 } finally {
526 if (log != null) {
527 log.closeAndDelete();
528 }
529 if (reader != null) {
530 reader.close();
531 }
532 }
533 }
534
535
536
537
538 @Test
539 public void testAppend() throws IOException {
540 final int COL_COUNT = 10;
541 final TableName tableName =
542 TableName.valueOf("tablename");
543 final byte [] row = Bytes.toBytes("row");
544 Reader reader = null;
545 HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
546 final AtomicLong sequenceId = new AtomicLong(1);
547 try {
548
549
550 long timestamp = System.currentTimeMillis();
551 WALEdit cols = new WALEdit();
552 for (int i = 0; i < COL_COUNT; i++) {
553 cols.add(new KeyValue(row, Bytes.toBytes("column"),
554 Bytes.toBytes(Integer.toString(i)),
555 timestamp, new byte[] { (byte)(i + '0') }));
556 }
557 HRegionInfo hri = new HRegionInfo(tableName,
558 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
559 HTableDescriptor htd = new HTableDescriptor();
560 htd.addFamily(new HColumnDescriptor("column"));
561 log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
562 log.startCacheFlush(hri.getEncodedNameAsBytes());
563 log.completeCacheFlush(hri.getEncodedNameAsBytes());
564 log.close();
565 Path filename = ((FSHLog) log).computeFilename();
566 log = null;
567
568 reader = HLogFactory.createReader(fs, filename, conf);
569 HLog.Entry entry = reader.next();
570 assertEquals(COL_COUNT, entry.getEdit().size());
571 int idx = 0;
572 for (KeyValue val : entry.getEdit().getKeyValues()) {
573 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
574 entry.getKey().getEncodedRegionName()));
575 assertTrue(tableName.equals(entry.getKey().getTablename()));
576 assertTrue(Bytes.equals(row, val.getRow()));
577 assertEquals((byte)(idx + '0'), val.getValue()[0]);
578 System.out.println(entry.getKey() + " " + val);
579 idx++;
580 }
581 } finally {
582 if (log != null) {
583 log.closeAndDelete();
584 }
585 if (reader != null) {
586 reader.close();
587 }
588 }
589 }
590
591
592
593
594
595 @Test
596 public void testVisitors() throws Exception {
597 final int COL_COUNT = 10;
598 final TableName tableName =
599 TableName.valueOf("tablename");
600 final byte [] row = Bytes.toBytes("row");
601 HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
602 final AtomicLong sequenceId = new AtomicLong(1);
603 try {
604 DumbWALActionsListener visitor = new DumbWALActionsListener();
605 log.registerWALActionsListener(visitor);
606 long timestamp = System.currentTimeMillis();
607 HTableDescriptor htd = new HTableDescriptor();
608 htd.addFamily(new HColumnDescriptor("column"));
609
610 HRegionInfo hri = new HRegionInfo(tableName,
611 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
612 for (int i = 0; i < COL_COUNT; i++) {
613 WALEdit cols = new WALEdit();
614 cols.add(new KeyValue(row, Bytes.toBytes("column"),
615 Bytes.toBytes(Integer.toString(i)),
616 timestamp, new byte[]{(byte) (i + '0')}));
617 log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
618 }
619 assertEquals(COL_COUNT, visitor.increments);
620 log.unregisterWALActionsListener(visitor);
621 WALEdit cols = new WALEdit();
622 cols.add(new KeyValue(row, Bytes.toBytes("column"),
623 Bytes.toBytes(Integer.toString(11)),
624 timestamp, new byte[]{(byte) (11 + '0')}));
625 log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
626 assertEquals(COL_COUNT, visitor.increments);
627 } finally {
628 if (log != null) log.closeAndDelete();
629 }
630 }
631
632 @Test
633 public void testLogCleaning() throws Exception {
634 LOG.info("testLogCleaning");
635 final TableName tableName =
636 TableName.valueOf("testLogCleaning");
637 final TableName tableName2 =
638 TableName.valueOf("testLogCleaning2");
639
640 HLog log = HLogFactory.createHLog(fs, hbaseDir,
641 getName(), conf);
642 final AtomicLong sequenceId = new AtomicLong(1);
643 try {
644 HRegionInfo hri = new HRegionInfo(tableName,
645 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
646 HRegionInfo hri2 = new HRegionInfo(tableName2,
647 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
648
649
650
651 addEdits(log, hri, tableName, 1, sequenceId);
652 log.rollWriter();
653 assertEquals(1, ((FSHLog) log).getNumRolledLogFiles());
654
655
656 addEdits(log, hri, tableName, 2, sequenceId);
657 log.rollWriter();
658 assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
659
660
661 addEdits(log, hri, tableName, 1, sequenceId);
662 addEdits(log, hri2, tableName2, 1, sequenceId);
663 addEdits(log, hri, tableName, 1, sequenceId);
664 addEdits(log, hri2, tableName2, 1, sequenceId);
665 log.rollWriter();
666 assertEquals(3, ((FSHLog) log).getNumRolledLogFiles());
667
668
669
670 addEdits(log, hri2, tableName2, 1, sequenceId);
671 log.startCacheFlush(hri.getEncodedNameAsBytes());
672 log.completeCacheFlush(hri.getEncodedNameAsBytes());
673 log.rollWriter();
674 assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
675
676
677
678
679 addEdits(log, hri2, tableName2, 1, sequenceId);
680 log.startCacheFlush(hri2.getEncodedNameAsBytes());
681 log.completeCacheFlush(hri2.getEncodedNameAsBytes());
682 log.rollWriter();
683 assertEquals(0, ((FSHLog) log).getNumRolledLogFiles());
684 } finally {
685 if (log != null) log.closeAndDelete();
686 }
687 }
688
689 @Test
690 public void testFailedToCreateHLogIfParentRenamed() throws IOException {
691 FSHLog log = (FSHLog)HLogFactory.createHLog(
692 fs, hbaseDir, "testFailedToCreateHLogIfParentRenamed", conf);
693 long filenum = System.currentTimeMillis();
694 Path path = log.computeFilename(filenum);
695 HLogFactory.createWALWriter(fs, path, conf);
696 Path parent = path.getParent();
697 path = log.computeFilename(filenum + 1);
698 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
699 fs.rename(parent, newPath);
700 try {
701 HLogFactory.createWALWriter(fs, path, conf);
702 fail("It should fail to create the new WAL");
703 } catch (IOException ioe) {
704
705 }
706 }
707
708 @Test
709 public void testGetServerNameFromHLogDirectoryName() throws IOException {
710 ServerName sn = ServerName.valueOf("hn", 450, 1398);
711 String hl = FSUtils.getRootDir(conf) + "/" + HLogUtil.getHLogDirectoryName(sn.toString());
712
713
714 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, null));
715 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf,
716 FSUtils.getRootDir(conf).toUri().toString()));
717 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, ""));
718 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, " "));
719 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl));
720 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "qdf"));
721 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, "sfqf" + hl + "qdf"));
722
723 final String wals = "/WALs/";
724 ServerName parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
725 FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
726 "/localhost%2C32984%2C1343316388997.1343316390417");
727 Assert.assertEquals("standard", sn, parsed);
728
729 parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "/qdf");
730 Assert.assertEquals("subdir", sn, parsed);
731
732 parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
733 FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
734 "-splitting/localhost%3A57020.1340474893931");
735 Assert.assertEquals("split", sn, parsed);
736 }
737
738
739
740
741 @Test
742 public void testWALCoprocessorLoaded() throws Exception {
743
744 HLog log = HLogFactory.createHLog(fs, hbaseDir,
745 getName(), conf);
746 try {
747 WALCoprocessorHost host = log.getCoprocessorHost();
748 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
749 assertNotNull(c);
750 } finally {
751 if (log != null) log.closeAndDelete();
752 }
753 }
754
755 private void addEdits(HLog log, HRegionInfo hri, TableName tableName,
756 int times, AtomicLong sequenceId) throws IOException {
757 HTableDescriptor htd = new HTableDescriptor();
758 htd.addFamily(new HColumnDescriptor("row"));
759
760 final byte [] row = Bytes.toBytes("row");
761 for (int i = 0; i < times; i++) {
762 long timestamp = System.currentTimeMillis();
763 WALEdit cols = new WALEdit();
764 cols.add(new KeyValue(row, row, row, timestamp, row));
765 log.append(hri, tableName, cols, timestamp, htd, sequenceId);
766 }
767 }
768
769
770
771
772
773 @Test
774 public void testReadLegacyLog() throws IOException {
775 final int columnCount = 5;
776 final int recordCount = 5;
777 final TableName tableName =
778 TableName.valueOf("tablename");
779 final byte[] row = Bytes.toBytes("row");
780 long timestamp = System.currentTimeMillis();
781 Path path = new Path(dir, "temphlog");
782 SequenceFileLogWriter sflw = null;
783 HLog.Reader reader = null;
784 try {
785 HRegionInfo hri = new HRegionInfo(tableName,
786 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
787 HTableDescriptor htd = new HTableDescriptor(tableName);
788 fs.mkdirs(dir);
789
790 sflw = new SequenceFileLogWriter();
791 sflw.init(fs, path, conf, false);
792 for (int i = 0; i < recordCount; ++i) {
793 HLogKey key = new HLogKey(
794 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
795 WALEdit edit = new WALEdit();
796 for (int j = 0; j < columnCount; ++j) {
797 if (i == 0) {
798 htd.addFamily(new HColumnDescriptor("column" + j));
799 }
800 String value = i + "" + j;
801 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
802 }
803 sflw.append(new HLog.Entry(key, edit));
804 }
805 sflw.sync();
806 sflw.close();
807
808
809 reader = HLogFactory.createReader(fs, path, conf);
810 assertTrue(reader instanceof SequenceFileLogReader);
811 for (int i = 0; i < recordCount; ++i) {
812 HLog.Entry entry = reader.next();
813 assertNotNull(entry);
814 assertEquals(columnCount, entry.getEdit().size());
815 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
816 assertEquals(tableName, entry.getKey().getTablename());
817 int idx = 0;
818 for (KeyValue val : entry.getEdit().getKeyValues()) {
819 assertTrue(Bytes.equals(row, val.getRow()));
820 String value = i + "" + idx;
821 assertArrayEquals(Bytes.toBytes(value), val.getValue());
822 idx++;
823 }
824 }
825 HLog.Entry entry = reader.next();
826 assertNull(entry);
827 } finally {
828 if (sflw != null) {
829 sflw.close();
830 }
831 if (reader != null) {
832 reader.close();
833 }
834 }
835 }
836
837
838
839
840
841 @Test
842 public void testWALTrailer() throws IOException {
843
844 doRead(true);
845
846 doRead(false);
847 }
848
849
850
851
852
853
854
855
856
857
858 private void doRead(boolean withTrailer) throws IOException {
859 final int columnCount = 5;
860 final int recordCount = 5;
861 final TableName tableName =
862 TableName.valueOf("tablename");
863 final byte[] row = Bytes.toBytes("row");
864 long timestamp = System.currentTimeMillis();
865 Path path = new Path(dir, "temphlog");
866
867 fs.delete(path, true);
868 HLog.Writer writer = null;
869 HLog.Reader reader = null;
870 try {
871 HRegionInfo hri = new HRegionInfo(tableName,
872 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
873 HTableDescriptor htd = new HTableDescriptor(tableName);
874 fs.mkdirs(dir);
875
876 writer = HLogFactory.createWALWriter(fs, path, conf);
877 for (int i = 0; i < recordCount; ++i) {
878 HLogKey key = new HLogKey(
879 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
880 WALEdit edit = new WALEdit();
881 for (int j = 0; j < columnCount; ++j) {
882 if (i == 0) {
883 htd.addFamily(new HColumnDescriptor("column" + j));
884 }
885 String value = i + "" + j;
886 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
887 }
888 writer.append(new HLog.Entry(key, edit));
889 }
890 writer.sync();
891 if (withTrailer) writer.close();
892
893
894 reader = HLogFactory.createReader(fs, path, conf);
895 assertTrue(reader instanceof ProtobufLogReader);
896 if (withTrailer) {
897 assertNotNull(reader.getWALTrailer());
898 } else {
899 assertNull(reader.getWALTrailer());
900 }
901 for (int i = 0; i < recordCount; ++i) {
902 HLog.Entry entry = reader.next();
903 assertNotNull(entry);
904 assertEquals(columnCount, entry.getEdit().size());
905 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
906 assertEquals(tableName, entry.getKey().getTablename());
907 int idx = 0;
908 for (KeyValue val : entry.getEdit().getKeyValues()) {
909 assertTrue(Bytes.equals(row, val.getRow()));
910 String value = i + "" + idx;
911 assertArrayEquals(Bytes.toBytes(value), val.getValue());
912 idx++;
913 }
914 }
915 HLog.Entry entry = reader.next();
916 assertNull(entry);
917 } finally {
918 if (writer != null) {
919 writer.close();
920 }
921 if (reader != null) {
922 reader.close();
923 }
924 }
925 }
926
927
928
929
930
931
932 @Test
933 public void testHLogComparator() throws Exception {
934 HLog hlog1 = null;
935 HLog hlogMeta = null;
936 try {
937 hlog1 = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
938 LOG.debug("Log obtained is: " + hlog1);
939 Comparator<Path> comp = ((FSHLog) hlog1).LOG_NAME_COMPARATOR;
940 Path p1 = ((FSHLog) hlog1).computeFilename(11);
941 Path p2 = ((FSHLog) hlog1).computeFilename(12);
942
943 assertTrue(comp.compare(p1, p1) == 0);
944
945 assertTrue(comp.compare(p1, p2) < 0);
946 hlogMeta = HLogFactory.createMetaHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf,
947 null, null);
948 Comparator<Path> compMeta = ((FSHLog) hlogMeta).LOG_NAME_COMPARATOR;
949
950 Path p1WithMeta = ((FSHLog) hlogMeta).computeFilename(11);
951 Path p2WithMeta = ((FSHLog) hlogMeta).computeFilename(12);
952 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
953 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
954
955 boolean ex = false;
956 try {
957 comp.compare(p1WithMeta, p2);
958 } catch (Exception e) {
959 ex = true;
960 }
961 assertTrue("Comparator doesn't complain while checking meta log files", ex);
962 boolean exMeta = false;
963 try {
964 compMeta.compare(p1WithMeta, p2);
965 } catch (Exception e) {
966 exMeta = true;
967 }
968 assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
969 } finally {
970 if (hlog1 != null) hlog1.close();
971 if (hlogMeta != null) hlogMeta.close();
972 }
973 }
974
975
976
977
978
979
980
981
982
983
984
985
986
987 @Test
988 public void testWALArchiving() throws IOException {
989 LOG.debug("testWALArchiving");
990 TableName table1 = TableName.valueOf("t1");
991 TableName table2 = TableName.valueOf("t2");
992 HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
993 try {
994 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
995 HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
996 HConstants.EMPTY_END_ROW);
997 HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
998 HConstants.EMPTY_END_ROW);
999
1000 hri1.setSplit(false);
1001 hri2.setSplit(false);
1002
1003 final AtomicLong sequenceId1 = new AtomicLong(1);
1004 final AtomicLong sequenceId2 = new AtomicLong(1);
1005
1006 addEdits(hlog, hri1, table1, 1, sequenceId1);
1007 hlog.rollWriter();
1008
1009 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1010
1011 addEdits(hlog, hri1, table1, 1, sequenceId1);
1012 hlog.rollWriter();
1013
1014 assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1015
1016 addEdits(hlog, hri1, table1, 3, sequenceId1);
1017 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1018
1019 hlog.rollWriter();
1020 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1021
1022 addEdits(hlog, hri2, table2, 1, sequenceId2);
1023 hlog.rollWriter();
1024 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1025
1026 addEdits(hlog, hri1, table1, 2, sequenceId1);
1027 hlog.rollWriter();
1028 assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1029
1030 addEdits(hlog, hri2, table2, 2, sequenceId2);
1031 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1032
1033
1034
1035
1036
1037 hlog.rollWriter();
1038 assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1039
1040 addEdits(hlog, hri2, table2, 2, sequenceId2);
1041 flushRegion(hlog, hri2.getEncodedNameAsBytes());
1042 hlog.rollWriter();
1043 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1044 } finally {
1045 if (hlog != null) hlog.close();
1046 }
1047 }
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058 @Test
1059 public void testFindMemStoresEligibleForFlush() throws Exception {
1060 LOG.debug("testFindMemStoresEligibleForFlush");
1061 Configuration conf1 = HBaseConfiguration.create(conf);
1062 conf1.setInt("hbase.regionserver.maxlogs", 1);
1063 HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1);
1064 TableName t1 = TableName.valueOf("t1");
1065 TableName t2 = TableName.valueOf("t2");
1066 HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1067 HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1068
1069 final AtomicLong sequenceId1 = new AtomicLong(1);
1070 final AtomicLong sequenceId2 = new AtomicLong(1);
1071
1072 try {
1073 addEdits(hlog, hri1, t1, 2, sequenceId1);
1074 hlog.rollWriter();
1075
1076 addEdits(hlog, hri1, t1, 2, sequenceId1);
1077 hlog.rollWriter();
1078
1079 assertTrue(((FSHLog) hlog).getNumRolledLogFiles() == 2);
1080
1081
1082
1083 byte[][] regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1084 assertEquals(1, regionsToFlush.length);
1085 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1086
1087 addEdits(hlog, hri2, t2, 2, sequenceId2);
1088
1089 regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1090 assertEquals(regionsToFlush.length, 1);
1091 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1092
1093
1094 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1095 hlog.rollWriter();
1096
1097 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1098
1099 flushRegion(hlog, hri2.getEncodedNameAsBytes());
1100 hlog.rollWriter(true);
1101
1102 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1103
1104 addEdits(hlog, hri1, t1, 2, sequenceId1);
1105 addEdits(hlog, hri2, t2, 2, sequenceId2);
1106 hlog.rollWriter();
1107
1108 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1109 addEdits(hlog, hri1, t1, 2, sequenceId1);
1110 hlog.rollWriter();
1111
1112
1113 regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1114 assertEquals(2, regionsToFlush.length);
1115
1116 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1117 flushRegion(hlog, hri2.getEncodedNameAsBytes());
1118 hlog.rollWriter(true);
1119 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1120
1121 addEdits(hlog, hri1, t1, 2, sequenceId1);
1122
1123 hlog.startCacheFlush(hri1.getEncodedNameAsBytes());
1124 hlog.rollWriter();
1125 hlog.completeCacheFlush(hri1.getEncodedNameAsBytes());
1126 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1127 } finally {
1128 if (hlog != null) hlog.close();
1129 }
1130 }
1131
1132
1133
1134
1135
1136
1137
1138
1139 @Test
1140 public void testAllRegionsFlushed() {
1141 LOG.debug("testAllRegionsFlushed");
1142 Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
1143 Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
1144 Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
1145
1146 TableName t1 = TableName.valueOf("t1");
1147
1148 HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1149
1150 final AtomicLong sequenceId1 = new AtomicLong(1);
1151
1152 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1153
1154 seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
1155 oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1156
1157 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1158
1159 oldestUnFlushedSeqNo.clear();
1160 oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1161 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1162
1163 oldestFlushingSeqNo.clear();
1164 oldestUnFlushedSeqNo.clear();
1165 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1166
1167 oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
1168 seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
1169 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1170
1171
1172
1173 oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
1174 oldestUnFlushedSeqNo.clear();
1175 seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
1176 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1177 }
1178
1179
1180
1181
1182
1183
1184 private void flushRegion(HLog hlog, byte[] regionEncodedName) {
1185 hlog.startCacheFlush(regionEncodedName);
1186 hlog.completeCacheFlush(regionEncodedName);
1187 }
1188
1189 static class DumbWALActionsListener implements WALActionsListener {
1190 int increments = 0;
1191
1192 @Override
1193 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
1194 WALEdit logEdit) {
1195 increments++;
1196 }
1197
1198 @Override
1199 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
1200
1201 increments++;
1202 }
1203
1204 @Override
1205 public void preLogRoll(Path oldFile, Path newFile) {
1206
1207 }
1208
1209 @Override
1210 public void postLogRoll(Path oldFile, Path newFile) {
1211
1212 }
1213
1214 @Override
1215 public void preLogArchive(Path oldFile, Path newFile) {
1216
1217 }
1218
1219 @Override
1220 public void postLogArchive(Path oldFile, Path newFile) {
1221
1222 }
1223
1224 @Override
1225 public void logRollRequested(boolean tooFewReplicas) {
1226
1227
1228 }
1229
1230 @Override
1231 public void logCloseRequested() {
1232
1233 }
1234 }
1235
1236 }
1237