1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.text.ParseException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.TreeMap;
34 import java.util.TreeSet;
35 import java.util.UUID;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CompletionService;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadFactory;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicLong;
47 import java.util.concurrent.atomic.AtomicReference;
48
49 import org.apache.commons.logging.Log;
50 import org.apache.commons.logging.LogFactory;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.fs.FileStatus;
54 import org.apache.hadoop.fs.FileSystem;
55 import org.apache.hadoop.fs.Path;
56 import org.apache.hadoop.hbase.Cell;
57 import org.apache.hadoop.hbase.CellScanner;
58 import org.apache.hadoop.hbase.CellUtil;
59 import org.apache.hadoop.hbase.HBaseConfiguration;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.HRegionInfo;
62 import org.apache.hadoop.hbase.HRegionLocation;
63 import org.apache.hadoop.hbase.KeyValue;
64 import org.apache.hadoop.hbase.KeyValueUtil;
65 import org.apache.hadoop.hbase.RemoteExceptionHandler;
66 import org.apache.hadoop.hbase.ServerName;
67 import org.apache.hadoop.hbase.TableName;
68 import org.apache.hadoop.hbase.TableNotFoundException;
69 import org.apache.hadoop.hbase.Tag;
70 import org.apache.hadoop.hbase.TagType;
71 import org.apache.hadoop.hbase.client.ConnectionUtils;
72 import org.apache.hadoop.hbase.client.Delete;
73 import org.apache.hadoop.hbase.client.HConnection;
74 import org.apache.hadoop.hbase.client.HConnectionManager;
75 import org.apache.hadoop.hbase.client.Mutation;
76 import org.apache.hadoop.hbase.client.Put;
77 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
78 import org.apache.hadoop.hbase.io.HeapSize;
79 import org.apache.hadoop.hbase.master.SplitLogManager;
80 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
81 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
82 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83 import org.apache.hadoop.hbase.protobuf.RequestConverter;
84 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
85 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
86 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
87 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
88 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
89 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
90 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
91 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
92 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
93 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
94 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
95 import org.apache.hadoop.hbase.regionserver.HRegion;
96 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
97 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
98 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
99 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.CancelableProgressable;
102 import org.apache.hadoop.hbase.util.ClassSize;
103 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
104 import org.apache.hadoop.hbase.util.FSUtils;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.Threads;
107 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
108 import org.apache.hadoop.hbase.zookeeper.ZKTable;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
110 import org.apache.hadoop.io.MultipleIOException;
111 import org.apache.zookeeper.KeeperException;
112
113 import com.google.common.base.Preconditions;
114 import com.google.common.collect.Lists;
115 import com.google.protobuf.ServiceException;
116
117
118
119
120
121
122 @InterfaceAudience.Private
123 public class HLogSplitter {
124 static final Log LOG = LogFactory.getLog(HLogSplitter.class);
125
126
127 protected final Path rootDir;
128 protected final FileSystem fs;
129 protected final Configuration conf;
130
131
132
133 OutputSink outputSink;
134 EntryBuffers entryBuffers;
135
136 private Set<TableName> disablingOrDisabledTables =
137 new HashSet<TableName>();
138 private ZooKeeperWatcher watcher;
139
140
141
142 protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
143
144
145
146 final Object dataAvailable = new Object();
147
148 private MonitoredTask status;
149
150
151 protected final LastSequenceId sequenceIdChecker;
152
153 protected boolean distributedLogReplay;
154
155
156 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
157
158
159 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
160 new ConcurrentHashMap<String, Map<byte[], Long>>();
161
162
163 protected String failedServerName = "";
164
165
166 private final int numWriterThreads;
167
168
169 private final int minBatchSize;
170
171 HLogSplitter(Configuration conf, Path rootDir,
172 FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, RecoveryMode mode) {
173 this.conf = HBaseConfiguration.create(conf);
174 String codecClassName = conf
175 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
176 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
177 this.rootDir = rootDir;
178 this.fs = fs;
179 this.sequenceIdChecker = idChecker;
180 this.watcher = zkw;
181
182 entryBuffers = new EntryBuffers(
183 this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
184 128*1024*1024));
185
186
187
188 this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
189 this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
190
191 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
192 if (zkw != null && this.distributedLogReplay) {
193 outputSink = new LogReplayOutputSink(numWriterThreads);
194 } else {
195 if (this.distributedLogReplay) {
196 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
197 }
198 this.distributedLogReplay = false;
199 outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
200 }
201
202 }
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
222 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
223 ZooKeeperWatcher zkw, RecoveryMode mode) throws IOException {
224 HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, mode);
225 return s.splitLogFile(logfile, reporter);
226 }
227
228
229
230
231
232 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
233 FileSystem fs, Configuration conf) throws IOException {
234 FileStatus[] logfiles = fs.listStatus(logDir);
235 List<Path> splits = new ArrayList<Path>();
236 if (logfiles != null && logfiles.length > 0) {
237 for (FileStatus logfile: logfiles) {
238 HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null,
239 RecoveryMode.LOG_SPLITTING);
240 if (s.splitLogFile(logfile, null)) {
241 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
242 if (s.outputSink.splits != null) {
243 splits.addAll(s.outputSink.splits);
244 }
245 }
246 }
247 }
248 if (!fs.delete(logDir, true)) {
249 throw new IOException("Unable to delete src dir: " + logDir);
250 }
251 return splits;
252 }
253
254
255 boolean splitLogFile(FileStatus logfile,
256 CancelableProgressable reporter) throws IOException {
257 boolean isCorrupted = false;
258 Preconditions.checkState(status == null);
259 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
260 HLog.SPLIT_SKIP_ERRORS_DEFAULT);
261 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
262 Path logPath = logfile.getPath();
263 boolean outputSinkStarted = false;
264 boolean progress_failed = false;
265 int editsCount = 0;
266 int editsSkipped = 0;
267
268 status =
269 TaskMonitor.get().createStatus(
270 "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
271 try {
272 long logLength = logfile.getLen();
273 LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
274 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
275 status.setStatus("Opening log file");
276 if (reporter != null && !reporter.progress()) {
277 progress_failed = true;
278 return false;
279 }
280 Reader in = null;
281 try {
282 in = getReader(fs, logfile, conf, skipErrors, reporter);
283 } catch (CorruptedLogFileException e) {
284 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
285 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
286 isCorrupted = true;
287 }
288 if (in == null) {
289 LOG.warn("Nothing to split in log file " + logPath);
290 return true;
291 }
292 if(watcher != null) {
293 try {
294 disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
295 } catch (KeeperException e) {
296 throw new IOException("Can't get disabling/disabled tables", e);
297 }
298 }
299 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
300 int numOpenedFilesLastCheck = 0;
301 outputSink.setReporter(reporter);
302 outputSink.startWriterThreads();
303 outputSinkStarted = true;
304 Entry entry;
305 Long lastFlushedSequenceId = -1L;
306 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
307 failedServerName = (serverName == null) ? "" : serverName.getServerName();
308 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
309 byte[] region = entry.getKey().getEncodedRegionName();
310 String key = Bytes.toString(region);
311 lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
312 if (lastFlushedSequenceId == null) {
313 if (this.distributedLogReplay) {
314 RegionStoreSequenceIds ids =
315 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
316 if (ids != null) {
317 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
318 }
319 } else if (sequenceIdChecker != null) {
320 lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
321 }
322 if (lastFlushedSequenceId == null) {
323 lastFlushedSequenceId = -1L;
324 }
325 lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
326 }
327 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
328 editsSkipped++;
329 continue;
330 }
331 entryBuffers.appendEntry(entry);
332 editsCount++;
333 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
334
335 if (editsCount % interval == 0
336 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
337 numOpenedFilesLastCheck = this.getNumOpenWriters();
338 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
339 + " edits, skipped " + editsSkipped + " edits.";
340 status.setStatus("Split " + countsStr);
341 if (reporter != null && !reporter.progress()) {
342 progress_failed = true;
343 return false;
344 }
345 }
346 }
347 } catch (InterruptedException ie) {
348 IOException iie = new InterruptedIOException();
349 iie.initCause(ie);
350 throw iie;
351 } catch (CorruptedLogFileException e) {
352 LOG.warn("Could not parse, corrupted log file " + logPath, e);
353 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
354 isCorrupted = true;
355 } catch (IOException e) {
356 e = RemoteExceptionHandler.checkIOException(e);
357 throw e;
358 } finally {
359 LOG.debug("Finishing writing output logs and closing down.");
360 try {
361 if (outputSinkStarted) {
362
363
364 progress_failed = true;
365 progress_failed = outputSink.finishWritingAndClose() == null;
366 }
367 } finally {
368 String msg =
369 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
370 + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
371 + " progress failed = " + progress_failed;
372 LOG.info(msg);
373 status.markComplete(msg);
374 }
375 }
376 return !progress_failed;
377 }
378
379
380
381
382
383
384
385
386
387
388
389
390 public static void finishSplitLogFile(String logfile,
391 Configuration conf) throws IOException {
392 Path rootdir = FSUtils.getRootDir(conf);
393 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
394 Path logPath;
395 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
396 logPath = new Path(logfile);
397 } else {
398 logPath = new Path(rootdir, logfile);
399 }
400 finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
401 }
402
403 static void finishSplitLogFile(Path rootdir, Path oldLogDir,
404 Path logPath, Configuration conf) throws IOException {
405 List<Path> processedLogs = new ArrayList<Path>();
406 List<Path> corruptedLogs = new ArrayList<Path>();
407 FileSystem fs;
408 fs = rootdir.getFileSystem(conf);
409 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
410 corruptedLogs.add(logPath);
411 } else {
412 processedLogs.add(logPath);
413 }
414 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
415 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
416 fs.delete(stagingDir, true);
417 }
418
419
420
421
422
423
424
425
426
427
428
429
430
431 private static void archiveLogs(
432 final List<Path> corruptedLogs,
433 final List<Path> processedLogs, final Path oldLogDir,
434 final FileSystem fs, final Configuration conf) throws IOException {
435 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
436 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
437
438 if (!fs.mkdirs(corruptDir)) {
439 LOG.info("Unable to mkdir " + corruptDir);
440 }
441 fs.mkdirs(oldLogDir);
442
443
444
445 for (Path corrupted : corruptedLogs) {
446 Path p = new Path(corruptDir, corrupted.getName());
447 if (fs.exists(corrupted)) {
448 if (!fs.rename(corrupted, p)) {
449 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
450 } else {
451 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
452 }
453 }
454 }
455
456 for (Path p : processedLogs) {
457 Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
458 if (fs.exists(p)) {
459 if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
460 LOG.warn("Unable to move " + p + " to " + newPath);
461 } else {
462 LOG.debug("Archived processed log " + p + " to " + newPath);
463 }
464 }
465 }
466 }
467
468
469
470
471
472
473
474
475
476
477
478
479
480 @SuppressWarnings("deprecation")
481 static Path getRegionSplitEditsPath(final FileSystem fs,
482 final Entry logEntry, final Path rootDir, boolean isCreate)
483 throws IOException {
484 Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
485 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
486 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
487 Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
488
489 if (!fs.exists(regiondir)) {
490 LOG.info("This region's directory doesn't exist: "
491 + regiondir.toString() + ". It is very likely that it was" +
492 " already split so it's safe to discard those edits.");
493 return null;
494 }
495 if (fs.exists(dir) && fs.isFile(dir)) {
496 Path tmp = new Path("/tmp");
497 if (!fs.exists(tmp)) {
498 fs.mkdirs(tmp);
499 }
500 tmp = new Path(tmp,
501 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
502 LOG.warn("Found existing old file: " + dir + ". It could be some "
503 + "leftover of an old installation. It should be a folder instead. "
504 + "So moving it to " + tmp);
505 if (!fs.rename(dir, tmp)) {
506 LOG.warn("Failed to sideline old file " + dir);
507 }
508 }
509
510 if (isCreate && !fs.exists(dir)) {
511 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
512 }
513
514
515 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
516 fileName = getTmpRecoveredEditsFileName(fileName);
517 return new Path(dir, fileName);
518 }
519
520 static String getTmpRecoveredEditsFileName(String fileName) {
521 return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
522 }
523
524
525
526
527
528
529
530
531
532 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
533 Long maximumEditLogSeqNum) {
534 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
535 return new Path(srcPath.getParent(), fileName);
536 }
537
538 static String formatRecoveredEditsFileName(final long seqid) {
539 return String.format("%019d", seqid);
540 }
541
542
543
544
545
546
547
548
549
550
551
552 protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
553 boolean skipErrors, CancelableProgressable reporter)
554 throws IOException, CorruptedLogFileException {
555 Path path = file.getPath();
556 long length = file.getLen();
557 Reader in;
558
559
560
561
562 if (length <= 0) {
563 LOG.warn("File " + path + " might be still open, length is 0");
564 }
565
566 try {
567 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
568 try {
569 in = getReader(fs, path, conf, reporter);
570 } catch (EOFException e) {
571 if (length <= 0) {
572
573
574
575
576
577 LOG.warn("Could not open " + path + " for reading. File is empty", e);
578 return null;
579 } else {
580
581 return null;
582 }
583 }
584 } catch (IOException e) {
585 if (e instanceof FileNotFoundException) {
586
587 LOG.warn("File " + path + " doesn't exist anymore.", e);
588 return null;
589 }
590 if (!skipErrors || e instanceof InterruptedIOException) {
591 throw e;
592 }
593 CorruptedLogFileException t =
594 new CorruptedLogFileException("skipErrors=true Could not open hlog " +
595 path + " ignoring");
596 t.initCause(e);
597 throw t;
598 }
599 return in;
600 }
601
602 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
603 throws CorruptedLogFileException, IOException {
604 try {
605 return in.next();
606 } catch (EOFException eof) {
607
608 LOG.info("EOF from hlog " + path + ". continuing");
609 return null;
610 } catch (IOException e) {
611
612
613 if (e.getCause() != null &&
614 (e.getCause() instanceof ParseException ||
615 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
616 LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
617 + path + ". continuing");
618 return null;
619 }
620 if (!skipErrors) {
621 throw e;
622 }
623 CorruptedLogFileException t =
624 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
625 " while parsing hlog " + path + ". Marking as corrupted");
626 t.initCause(e);
627 throw t;
628 }
629 }
630
631 private void writerThreadError(Throwable t) {
632 thrown.compareAndSet(null, t);
633 }
634
635
636
637
638 private void checkForErrors() throws IOException {
639 Throwable thrown = this.thrown.get();
640 if (thrown == null) return;
641 if (thrown instanceof IOException) {
642 throw new IOException(thrown);
643 } else {
644 throw new RuntimeException(thrown);
645 }
646 }
647
648
649
650 protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
651 throws IOException {
652 return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
653 }
654
655
656
657
658 protected Reader getReader(FileSystem fs, Path curLogFile,
659 Configuration conf, CancelableProgressable reporter) throws IOException {
660 return HLogFactory.createReader(fs, curLogFile, conf, reporter);
661 }
662
663
664
665
666 private int getNumOpenWriters() {
667 int result = 0;
668 if (this.outputSink != null) {
669 result += this.outputSink.getNumOpenWriters();
670 }
671 return result;
672 }
673
674
675
676
677
678
679
680
681 class EntryBuffers {
682 Map<byte[], RegionEntryBuffer> buffers =
683 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
684
685
686
687
688 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
689
690 long totalBuffered = 0;
691 long maxHeapUsage;
692
693 EntryBuffers(long maxHeapUsage) {
694 this.maxHeapUsage = maxHeapUsage;
695 }
696
697
698
699
700
701
702
703
704 void appendEntry(Entry entry) throws InterruptedException, IOException {
705 HLogKey key = entry.getKey();
706
707 RegionEntryBuffer buffer;
708 long incrHeap;
709 synchronized (this) {
710 buffer = buffers.get(key.getEncodedRegionName());
711 if (buffer == null) {
712 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
713 buffers.put(key.getEncodedRegionName(), buffer);
714 }
715 incrHeap= buffer.appendEntry(entry);
716 }
717
718
719 synchronized (dataAvailable) {
720 totalBuffered += incrHeap;
721 while (totalBuffered > maxHeapUsage && thrown.get() == null) {
722 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
723 dataAvailable.wait(2000);
724 }
725 dataAvailable.notifyAll();
726 }
727 checkForErrors();
728 }
729
730
731
732
733 synchronized RegionEntryBuffer getChunkToWrite() {
734 long biggestSize = 0;
735 byte[] biggestBufferKey = null;
736
737 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
738 long size = entry.getValue().heapSize();
739 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
740 biggestSize = size;
741 biggestBufferKey = entry.getKey();
742 }
743 }
744 if (biggestBufferKey == null) {
745 return null;
746 }
747
748 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
749 currentlyWriting.add(biggestBufferKey);
750 return buffer;
751 }
752
753 void doneWriting(RegionEntryBuffer buffer) {
754 synchronized (this) {
755 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
756 assert removed;
757 }
758 long size = buffer.heapSize();
759
760 synchronized (dataAvailable) {
761 totalBuffered -= size;
762
763 dataAvailable.notifyAll();
764 }
765 }
766
767 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
768 return currentlyWriting.contains(region);
769 }
770 }
771
772
773
774
775
776
777
778 static class RegionEntryBuffer implements HeapSize {
779 long heapInBuffer = 0;
780 List<Entry> entryBuffer;
781 TableName tableName;
782 byte[] encodedRegionName;
783
784 RegionEntryBuffer(TableName tableName, byte[] region) {
785 this.tableName = tableName;
786 this.encodedRegionName = region;
787 this.entryBuffer = new LinkedList<Entry>();
788 }
789
790 long appendEntry(Entry entry) {
791 internify(entry);
792 entryBuffer.add(entry);
793 long incrHeap = entry.getEdit().heapSize() +
794 ClassSize.align(2 * ClassSize.REFERENCE) +
795 0;
796 heapInBuffer += incrHeap;
797 return incrHeap;
798 }
799
800 private void internify(Entry entry) {
801 HLogKey k = entry.getKey();
802 k.internTableName(this.tableName);
803 k.internEncodedRegionName(this.encodedRegionName);
804 }
805
806 public long heapSize() {
807 return heapInBuffer;
808 }
809 }
810
811 class WriterThread extends Thread {
812 private volatile boolean shouldStop = false;
813 private OutputSink outputSink = null;
814
815 WriterThread(OutputSink sink, int i) {
816 super(Thread.currentThread().getName() + "-Writer-" + i);
817 outputSink = sink;
818 }
819
820 public void run() {
821 try {
822 doRun();
823 } catch (Throwable t) {
824 LOG.error("Exiting thread", t);
825 writerThreadError(t);
826 }
827 }
828
829 private void doRun() throws IOException {
830 LOG.debug("Writer thread " + this + ": starting");
831 while (true) {
832 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
833 if (buffer == null) {
834
835 synchronized (dataAvailable) {
836 if (shouldStop && !this.outputSink.flush()) {
837 return;
838 }
839 try {
840 dataAvailable.wait(500);
841 } catch (InterruptedException ie) {
842 if (!shouldStop) {
843 throw new RuntimeException(ie);
844 }
845 }
846 }
847 continue;
848 }
849
850 assert buffer != null;
851 try {
852 writeBuffer(buffer);
853 } finally {
854 entryBuffers.doneWriting(buffer);
855 }
856 }
857 }
858
859 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
860 outputSink.append(buffer);
861 }
862
863 void finish() {
864 synchronized (dataAvailable) {
865 shouldStop = true;
866 dataAvailable.notifyAll();
867 }
868 }
869 }
870
871
872
873
874
875 abstract class OutputSink {
876
877 protected Map<byte[], SinkWriter> writers = Collections
878 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
879
880 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
881 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
882
883 protected final List<WriterThread> writerThreads = Lists.newArrayList();
884
885
886 protected final Set<byte[]> blacklistedRegions = Collections
887 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
888
889 protected boolean closeAndCleanCompleted = false;
890
891 protected boolean writersClosed = false;
892
893 protected final int numThreads;
894
895 protected CancelableProgressable reporter = null;
896
897 protected AtomicLong skippedEdits = new AtomicLong();
898
899 protected List<Path> splits = null;
900
901 public OutputSink(int numWriters) {
902 numThreads = numWriters;
903 }
904
905 void setReporter(CancelableProgressable reporter) {
906 this.reporter = reporter;
907 }
908
909
910
911
912 synchronized void startWriterThreads() {
913 for (int i = 0; i < numThreads; i++) {
914 WriterThread t = new WriterThread(this, i);
915 t.start();
916 writerThreads.add(t);
917 }
918 }
919
920
921
922
923
924 void updateRegionMaximumEditLogSeqNum(Entry entry) {
925 synchronized (regionMaximumEditLogSeqNum) {
926 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
927 .getEncodedRegionName());
928 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
929 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
930 .getLogSeqNum());
931 }
932 }
933 }
934
935 Long getRegionMaximumEditLogSeqNum(byte[] region) {
936 return regionMaximumEditLogSeqNum.get(region);
937 }
938
939
940
941
942 int getNumOpenWriters() {
943 return this.writers.size();
944 }
945
946 long getSkippedEdits() {
947 return this.skippedEdits.get();
948 }
949
950
951
952
953
954
955 protected boolean finishWriting() throws IOException {
956 LOG.info("Waiting for split writer threads to finish");
957 boolean progress_failed = false;
958 for (WriterThread t : writerThreads) {
959 t.finish();
960 }
961 for (WriterThread t : writerThreads) {
962 if (!progress_failed && reporter != null && !reporter.progress()) {
963 progress_failed = true;
964 }
965 try {
966 t.join();
967 } catch (InterruptedException ie) {
968 IOException iie = new InterruptedIOException();
969 iie.initCause(ie);
970 throw iie;
971 }
972 }
973 checkForErrors();
974 LOG.info("Split writers finished");
975 return (!progress_failed);
976 }
977
978 abstract List<Path> finishWritingAndClose() throws IOException;
979
980
981
982
983 abstract Map<byte[], Long> getOutputCounts();
984
985
986
987
988 abstract int getNumberOfRecoveredRegions();
989
990
991
992
993
994 abstract void append(RegionEntryBuffer buffer) throws IOException;
995
996
997
998
999
1000 protected boolean flush() throws IOException {
1001 return false;
1002 }
1003 }
1004
1005
1006
1007
1008 class LogRecoveredEditsOutputSink extends OutputSink {
1009
1010 public LogRecoveredEditsOutputSink(int numWriters) {
1011
1012
1013
1014
1015
1016 super(numWriters);
1017 }
1018
1019
1020
1021
1022
1023 @Override
1024 List<Path> finishWritingAndClose() throws IOException {
1025 boolean isSuccessful = false;
1026 List<Path> result = null;
1027 try {
1028 isSuccessful = finishWriting();
1029 } finally {
1030 result = close();
1031 List<IOException> thrown = closeLogWriters(null);
1032 if (thrown != null && !thrown.isEmpty()) {
1033 throw MultipleIOException.createIOException(thrown);
1034 }
1035 }
1036 if (isSuccessful) {
1037 splits = result;
1038 }
1039 return splits;
1040 }
1041
1042
1043
1044
1045
1046 private List<Path> close() throws IOException {
1047 Preconditions.checkState(!closeAndCleanCompleted);
1048
1049 final List<Path> paths = new ArrayList<Path>();
1050 final List<IOException> thrown = Lists.newArrayList();
1051 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1052 TimeUnit.SECONDS, new ThreadFactory() {
1053 private int count = 1;
1054
1055 public Thread newThread(Runnable r) {
1056 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1057 return t;
1058 }
1059 });
1060 CompletionService<Void> completionService =
1061 new ExecutorCompletionService<Void>(closeThreadPool);
1062 for (final Map.Entry<byte[], ? extends SinkWriter> writersEntry : writers.entrySet()) {
1063 LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1064 completionService.submit(new Callable<Void>() {
1065 public Void call() throws Exception {
1066 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1067 LOG.debug("Closing " + wap.p);
1068 try {
1069 wap.w.close();
1070 } catch (IOException ioe) {
1071 LOG.error("Couldn't close log at " + wap.p, ioe);
1072 thrown.add(ioe);
1073 return null;
1074 }
1075 LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1076 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1077
1078 if (wap.editsWritten == 0) {
1079
1080 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1081 LOG.warn("Failed deleting empty " + wap.p);
1082 throw new IOException("Failed deleting empty " + wap.p);
1083 }
1084 return null;
1085 }
1086
1087 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1088 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1089 try {
1090 if (!dst.equals(wap.p) && fs.exists(dst)) {
1091 LOG.warn("Found existing old edits file. It could be the "
1092 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1093 + fs.getFileStatus(dst).getLen());
1094 if (!fs.delete(dst, false)) {
1095 LOG.warn("Failed deleting of old " + dst);
1096 throw new IOException("Failed deleting of old " + dst);
1097 }
1098 }
1099
1100
1101
1102 if (fs.exists(wap.p)) {
1103 if (!fs.rename(wap.p, dst)) {
1104 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1105 }
1106 LOG.debug("Rename " + wap.p + " to " + dst);
1107 }
1108 } catch (IOException ioe) {
1109 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1110 thrown.add(ioe);
1111 return null;
1112 }
1113 paths.add(dst);
1114 return null;
1115 }
1116 });
1117 }
1118
1119 boolean progress_failed = false;
1120 try {
1121 for (int i = 0, n = this.writers.size(); i < n; i++) {
1122 Future<Void> future = completionService.take();
1123 future.get();
1124 if (!progress_failed && reporter != null && !reporter.progress()) {
1125 progress_failed = true;
1126 }
1127 }
1128 } catch (InterruptedException e) {
1129 IOException iie = new InterruptedIOException();
1130 iie.initCause(e);
1131 throw iie;
1132 } catch (ExecutionException e) {
1133 throw new IOException(e.getCause());
1134 } finally {
1135 closeThreadPool.shutdownNow();
1136 }
1137
1138 if (!thrown.isEmpty()) {
1139 throw MultipleIOException.createIOException(thrown);
1140 }
1141 writersClosed = true;
1142 closeAndCleanCompleted = true;
1143 if (progress_failed) {
1144 return null;
1145 }
1146 return paths;
1147 }
1148
1149 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1150 if (writersClosed) {
1151 return thrown;
1152 }
1153
1154 if (thrown == null) {
1155 thrown = Lists.newArrayList();
1156 }
1157 try {
1158 for (WriterThread t : writerThreads) {
1159 while (t.isAlive()) {
1160 t.shouldStop = true;
1161 t.interrupt();
1162 try {
1163 t.join(10);
1164 } catch (InterruptedException e) {
1165 IOException iie = new InterruptedIOException();
1166 iie.initCause(e);
1167 throw iie;
1168 }
1169 }
1170 }
1171 } finally {
1172 synchronized (writers) {
1173 WriterAndPath wap = null;
1174 for (SinkWriter tmpWAP : writers.values()) {
1175 try {
1176 wap = (WriterAndPath) tmpWAP;
1177 wap.w.close();
1178 } catch (IOException ioe) {
1179 LOG.error("Couldn't close log at " + wap.p, ioe);
1180 thrown.add(ioe);
1181 continue;
1182 }
1183 LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1184 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1185 }
1186 }
1187 writersClosed = true;
1188 }
1189
1190 return thrown;
1191 }
1192
1193
1194
1195
1196
1197
1198 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1199 byte region[] = entry.getKey().getEncodedRegionName();
1200 WriterAndPath ret = (WriterAndPath) writers.get(region);
1201 if (ret != null) {
1202 return ret;
1203 }
1204
1205
1206 if (blacklistedRegions.contains(region)) {
1207 return null;
1208 }
1209 ret = createWAP(region, entry, rootDir, fs, conf);
1210 if (ret == null) {
1211 blacklistedRegions.add(region);
1212 return null;
1213 }
1214 writers.put(region, ret);
1215 return ret;
1216 }
1217
1218 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1219 Configuration conf) throws IOException {
1220 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1221 if (regionedits == null) {
1222 return null;
1223 }
1224 if (fs.exists(regionedits)) {
1225 LOG.warn("Found old edits file. It could be the "
1226 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1227 + fs.getFileStatus(regionedits).getLen());
1228 if (!fs.delete(regionedits, false)) {
1229 LOG.warn("Failed delete of old " + regionedits);
1230 }
1231 }
1232 Writer w = createWriter(fs, regionedits, conf);
1233 LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1234 return (new WriterAndPath(regionedits, w));
1235 }
1236
1237 void append(RegionEntryBuffer buffer) throws IOException {
1238 List<Entry> entries = buffer.entryBuffer;
1239 if (entries.isEmpty()) {
1240 LOG.warn("got an empty buffer, skipping");
1241 return;
1242 }
1243
1244 WriterAndPath wap = null;
1245
1246 long startTime = System.nanoTime();
1247 try {
1248 int editsCount = 0;
1249
1250 for (Entry logEntry : entries) {
1251 if (wap == null) {
1252 wap = getWriterAndPath(logEntry);
1253 if (wap == null) {
1254
1255 return;
1256 }
1257 }
1258 wap.w.append(logEntry);
1259 this.updateRegionMaximumEditLogSeqNum(logEntry);
1260 editsCount++;
1261 }
1262
1263 wap.incrementEdits(editsCount);
1264 wap.incrementNanoTime(System.nanoTime() - startTime);
1265 } catch (IOException e) {
1266 e = RemoteExceptionHandler.checkIOException(e);
1267 LOG.fatal(" Got while writing log entry to log", e);
1268 throw e;
1269 }
1270 }
1271
1272
1273
1274
1275 Map<byte[], Long> getOutputCounts() {
1276 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1277 synchronized (writers) {
1278 for (Map.Entry<byte[], ? extends SinkWriter> entry : writers.entrySet()) {
1279 ret.put(entry.getKey(), entry.getValue().editsWritten);
1280 }
1281 }
1282 return ret;
1283 }
1284
1285 @Override
1286 int getNumberOfRecoveredRegions() {
1287 return writers.size();
1288 }
1289 }
1290
1291
1292
1293
1294 private abstract static class SinkWriter {
1295
1296 long editsWritten = 0;
1297
1298 long nanosSpent = 0;
1299
1300 void incrementEdits(int edits) {
1301 editsWritten += edits;
1302 }
1303
1304 void incrementNanoTime(long nanos) {
1305 nanosSpent += nanos;
1306 }
1307 }
1308
1309
1310
1311
1312
1313 private final static class WriterAndPath extends SinkWriter {
1314 final Path p;
1315 final Writer w;
1316
1317 WriterAndPath(final Path p, final Writer w) {
1318 this.p = p;
1319 this.w = w;
1320 }
1321 }
1322
1323
1324
1325
1326 class LogReplayOutputSink extends OutputSink {
1327 private static final double BUFFER_THRESHOLD = 0.35;
1328 private static final String KEY_DELIMITER = "#";
1329
1330 private long waitRegionOnlineTimeOut;
1331 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1332 private final Map<String, RegionServerWriter> writers =
1333 new ConcurrentHashMap<String, RegionServerWriter>();
1334
1335 private final Map<String, HRegionLocation> onlineRegions =
1336 new ConcurrentHashMap<String, HRegionLocation>();
1337
1338 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1339 .synchronizedMap(new TreeMap<TableName, HConnection>());
1340
1341
1342
1343
1344 private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1345 new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1346 private List<Throwable> thrown = new ArrayList<Throwable>();
1347
1348
1349
1350
1351
1352 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1353 private boolean hasEditsInDisablingOrDisabledTables = false;
1354
1355 public LogReplayOutputSink(int numWriters) {
1356 super(numWriters);
1357 this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1358 SplitLogManager.DEFAULT_TIMEOUT);
1359 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1360 this.logRecoveredEditsOutputSink.setReporter(reporter);
1361 }
1362
1363 void append(RegionEntryBuffer buffer) throws IOException {
1364 List<Entry> entries = buffer.entryBuffer;
1365 if (entries.isEmpty()) {
1366 LOG.warn("got an empty buffer, skipping");
1367 return;
1368 }
1369
1370
1371 if (disablingOrDisabledTables.contains(buffer.tableName)) {
1372
1373 logRecoveredEditsOutputSink.append(buffer);
1374 hasEditsInDisablingOrDisabledTables = true;
1375
1376 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1377 return;
1378 }
1379
1380
1381 groupEditsByServer(entries);
1382
1383
1384 String maxLocKey = null;
1385 int maxSize = 0;
1386 List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1387 synchronized (this.serverToBufferQueueMap) {
1388 for (String key : this.serverToBufferQueueMap.keySet()) {
1389 List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1390 if (curQueue.size() > maxSize) {
1391 maxSize = curQueue.size();
1392 maxQueue = curQueue;
1393 maxLocKey = key;
1394 }
1395 }
1396 if (maxSize < minBatchSize
1397 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1398
1399 return;
1400 } else if (maxSize > 0) {
1401 this.serverToBufferQueueMap.remove(maxLocKey);
1402 }
1403 }
1404
1405 if (maxSize > 0) {
1406 processWorkItems(maxLocKey, maxQueue);
1407 }
1408 }
1409
1410 private void addToRecoveredRegions(String encodedRegionName) {
1411 if (!recoveredRegions.contains(encodedRegionName)) {
1412 recoveredRegions.add(encodedRegionName);
1413 }
1414 }
1415
1416
1417
1418
1419
1420 private void groupEditsByServer(List<Entry> entries) throws IOException {
1421 Set<TableName> nonExistentTables = null;
1422 Long cachedLastFlushedSequenceId = -1l;
1423 for (HLog.Entry entry : entries) {
1424 WALEdit edit = entry.getEdit();
1425 TableName table = entry.getKey().getTablename();
1426
1427 entry.getKey().setScopes(null);
1428 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1429
1430 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1431 this.skippedEdits.incrementAndGet();
1432 continue;
1433 }
1434
1435 Map<byte[], Long> maxStoreSequenceIds = null;
1436 boolean needSkip = false;
1437 HRegionLocation loc = null;
1438 String locKey = null;
1439 List<KeyValue> kvs = edit.getKeyValues();
1440 List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
1441 HConnection hconn = this.getConnectionByTableName(table);
1442
1443 for (KeyValue kv : kvs) {
1444 byte[] row = kv.getRow();
1445 byte[] family = kv.getFamily();
1446 boolean isCompactionEntry = false;
1447
1448 if (kv.matchingFamily(WALEdit.METAFAMILY)) {
1449 CompactionDescriptor compaction = WALEdit.getCompaction(kv);
1450 if (compaction != null && compaction.hasRegionName()) {
1451 try {
1452 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1453 .toByteArray());
1454 row = regionName[1];
1455 family = compaction.getFamilyName().toByteArray();
1456 isCompactionEntry = true;
1457 } catch (Exception ex) {
1458 LOG.warn("Unexpected exception received, ignoring " + ex);
1459 skippedKVs.add(kv);
1460 continue;
1461 }
1462 } else {
1463 skippedKVs.add(kv);
1464 continue;
1465 }
1466 }
1467
1468 try {
1469 loc =
1470 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1471 encodeRegionNameStr);
1472
1473 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1474 loc.getRegionInfo().getEncodedName())) {
1475 LOG.info("Not replaying a compaction marker for an older region: "
1476 + encodeRegionNameStr);
1477 needSkip = true;
1478 }
1479 } catch (TableNotFoundException ex) {
1480
1481 LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1482 + encodeRegionNameStr);
1483 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1484 if (nonExistentTables == null) {
1485 nonExistentTables = new TreeSet<TableName>();
1486 }
1487 nonExistentTables.add(table);
1488 this.skippedEdits.incrementAndGet();
1489 needSkip = true;
1490 break;
1491 }
1492
1493 cachedLastFlushedSequenceId =
1494 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1495 if (cachedLastFlushedSequenceId != null
1496 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1497
1498 this.skippedEdits.incrementAndGet();
1499 needSkip = true;
1500 break;
1501 } else {
1502 if (maxStoreSequenceIds == null) {
1503 maxStoreSequenceIds =
1504 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1505 }
1506 if (maxStoreSequenceIds != null) {
1507 Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1508 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1509
1510 skippedKVs.add(kv);
1511 continue;
1512 }
1513 }
1514 }
1515 }
1516
1517
1518 if (loc == null || needSkip) continue;
1519
1520 if (!skippedKVs.isEmpty()) {
1521 kvs.removeAll(skippedKVs);
1522 }
1523
1524 synchronized (serverToBufferQueueMap) {
1525 locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1526 List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1527 if (queue == null) {
1528 queue =
1529 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1530 serverToBufferQueueMap.put(locKey, queue);
1531 }
1532 queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1533 }
1534
1535 addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1536 }
1537 }
1538
1539
1540
1541
1542
1543
1544 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1545 TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1546
1547 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1548 if(loc != null) return loc;
1549
1550 loc = hconn.getRegionLocation(table, row, true);
1551 if (loc == null) {
1552 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1553 + " of table:" + table);
1554 }
1555
1556 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1557
1558 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1559 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1560 if (tmpLoc != null) return tmpLoc;
1561 }
1562
1563 Long lastFlushedSequenceId = -1l;
1564 AtomicBoolean isRecovering = new AtomicBoolean(true);
1565 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1566 if (!isRecovering.get()) {
1567
1568
1569 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1570 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1571 + " because it's not in recovering.");
1572 } else {
1573 Long cachedLastFlushedSequenceId =
1574 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1575
1576
1577
1578 RegionStoreSequenceIds ids =
1579 SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
1580 .getRegionInfo().getEncodedName());
1581 if (ids != null) {
1582 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1583 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1584 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1585 for (StoreSequenceId id : maxSeqIdInStores) {
1586 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1587 }
1588 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1589 }
1590
1591 if (cachedLastFlushedSequenceId == null
1592 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1593 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1594 }
1595 }
1596
1597 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1598 return loc;
1599 }
1600
1601 private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1602 throws IOException {
1603 RegionServerWriter rsw = null;
1604
1605 long startTime = System.nanoTime();
1606 try {
1607 rsw = getRegionServerWriter(key);
1608 rsw.sink.replayEntries(actions);
1609
1610
1611 rsw.incrementEdits(actions.size());
1612 rsw.incrementNanoTime(System.nanoTime() - startTime);
1613 } catch (IOException e) {
1614 e = RemoteExceptionHandler.checkIOException(e);
1615 LOG.fatal(" Got while writing log entry to log", e);
1616 throw e;
1617 }
1618 }
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1630 final long timeout, AtomicBoolean isRecovering)
1631 throws IOException {
1632 final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1633 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1634 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1635 boolean reloadLocation = false;
1636 TableName tableName = loc.getRegionInfo().getTable();
1637 int tries = 0;
1638 Throwable cause = null;
1639 while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1640 try {
1641
1642 HConnection hconn = getConnectionByTableName(tableName);
1643 if(reloadLocation) {
1644 loc = hconn.getRegionLocation(tableName, row, true);
1645 }
1646 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1647 HRegionInfo region = loc.getRegionInfo();
1648 try {
1649 GetRegionInfoRequest request =
1650 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1651 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1652 if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1653 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1654 return loc;
1655 }
1656 } catch (ServiceException se) {
1657 throw ProtobufUtil.getRemoteException(se);
1658 }
1659 } catch (IOException e) {
1660 cause = e.getCause();
1661 if(!(cause instanceof RegionOpeningException)) {
1662 reloadLocation = true;
1663 }
1664 }
1665 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1666 try {
1667 Thread.sleep(expectedSleep);
1668 } catch (InterruptedException e) {
1669 throw new IOException("Interrupted when waiting region " +
1670 loc.getRegionInfo().getEncodedName() + " online.", e);
1671 }
1672 tries++;
1673 }
1674
1675 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1676 " online for " + timeout + " milliseconds.", cause);
1677 }
1678
1679 @Override
1680 protected boolean flush() throws IOException {
1681 String curLoc = null;
1682 int curSize = 0;
1683 List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1684 synchronized (this.serverToBufferQueueMap) {
1685 for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1686 curQueue = this.serverToBufferQueueMap.get(locationKey);
1687 if (!curQueue.isEmpty()) {
1688 curSize = curQueue.size();
1689 curLoc = locationKey;
1690 break;
1691 }
1692 }
1693 if (curSize > 0) {
1694 this.serverToBufferQueueMap.remove(curLoc);
1695 }
1696 }
1697
1698 if (curSize > 0) {
1699 this.processWorkItems(curLoc, curQueue);
1700 dataAvailable.notifyAll();
1701 return true;
1702 }
1703 return false;
1704 }
1705
1706 void addWriterError(Throwable t) {
1707 thrown.add(t);
1708 }
1709
1710 @Override
1711 List<Path> finishWritingAndClose() throws IOException {
1712 try {
1713 if (!finishWriting()) {
1714 return null;
1715 }
1716 if (hasEditsInDisablingOrDisabledTables) {
1717 splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1718 } else {
1719 splits = new ArrayList<Path>();
1720 }
1721
1722 return splits;
1723 } finally {
1724 List<IOException> thrown = closeRegionServerWriters();
1725 if (thrown != null && !thrown.isEmpty()) {
1726 throw MultipleIOException.createIOException(thrown);
1727 }
1728 }
1729 }
1730
1731 @Override
1732 int getNumOpenWriters() {
1733 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1734 }
1735
1736 private List<IOException> closeRegionServerWriters() throws IOException {
1737 List<IOException> result = null;
1738 if (!writersClosed) {
1739 result = Lists.newArrayList();
1740 try {
1741 for (WriterThread t : writerThreads) {
1742 while (t.isAlive()) {
1743 t.shouldStop = true;
1744 t.interrupt();
1745 try {
1746 t.join(10);
1747 } catch (InterruptedException e) {
1748 IOException iie = new InterruptedIOException();
1749 iie.initCause(e);
1750 throw iie;
1751 }
1752 }
1753 }
1754 } finally {
1755 synchronized (writers) {
1756 for (String locationKey : writers.keySet()) {
1757 RegionServerWriter tmpW = writers.get(locationKey);
1758 try {
1759 tmpW.close();
1760 } catch (IOException ioe) {
1761 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1762 result.add(ioe);
1763 }
1764 }
1765 }
1766
1767
1768 synchronized (this.tableNameToHConnectionMap) {
1769 for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1770 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1771 try {
1772 hconn.clearRegionCache();
1773 hconn.close();
1774 } catch (IOException ioe) {
1775 result.add(ioe);
1776 }
1777 }
1778 }
1779 writersClosed = true;
1780 }
1781 }
1782 return result;
1783 }
1784
1785 Map<byte[], Long> getOutputCounts() {
1786 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1787 synchronized (writers) {
1788 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1789 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1790 }
1791 }
1792 return ret;
1793 }
1794
1795 @Override
1796 int getNumberOfRecoveredRegions() {
1797 return this.recoveredRegions.size();
1798 }
1799
1800
1801
1802
1803
1804
1805 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1806 RegionServerWriter ret = writers.get(loc);
1807 if (ret != null) {
1808 return ret;
1809 }
1810
1811 TableName tableName = getTableFromLocationStr(loc);
1812 if(tableName == null){
1813 throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1814 }
1815
1816 HConnection hconn = getConnectionByTableName(tableName);
1817 synchronized (writers) {
1818 ret = writers.get(loc);
1819 if (ret == null) {
1820 ret = new RegionServerWriter(conf, tableName, hconn);
1821 writers.put(loc, ret);
1822 }
1823 }
1824 return ret;
1825 }
1826
1827 private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1828 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1829 if (hconn == null) {
1830 synchronized (this.tableNameToHConnectionMap) {
1831 hconn = this.tableNameToHConnectionMap.get(tableName);
1832 if (hconn == null) {
1833 hconn = HConnectionManager.getConnection(conf);
1834 this.tableNameToHConnectionMap.put(tableName, hconn);
1835 }
1836 }
1837 }
1838 return hconn;
1839 }
1840 private TableName getTableFromLocationStr(String loc) {
1841
1842
1843
1844 String[] splits = loc.split(KEY_DELIMITER);
1845 if (splits.length != 2) {
1846 return null;
1847 }
1848 return TableName.valueOf(splits[1]);
1849 }
1850 }
1851
1852
1853
1854
1855
1856 private final static class RegionServerWriter extends SinkWriter {
1857 final WALEditsReplaySink sink;
1858
1859 RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1860 throws IOException {
1861 this.sink = new WALEditsReplaySink(conf, tableName, conn);
1862 }
1863
1864 void close() throws IOException {
1865 }
1866 }
1867
1868 static class CorruptedLogFileException extends Exception {
1869 private static final long serialVersionUID = 1L;
1870
1871 CorruptedLogFileException(String s) {
1872 super(s);
1873 }
1874 }
1875
1876
1877 public static class MutationReplay {
1878 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1879 this.type = type;
1880 this.mutation = mutation;
1881 this.nonceGroup = nonceGroup;
1882 this.nonce = nonce;
1883 }
1884
1885 public final MutationType type;
1886 public final Mutation mutation;
1887 public final long nonceGroup;
1888 public final long nonce;
1889 }
1890
1891
1892
1893
1894
1895
1896
1897 private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
1898
1899 boolean needAddRecoveryTag = true;
1900 if (cell.getTagsLengthUnsigned() > 0) {
1901 Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(),
1902 cell.getTagsLengthUnsigned(), TagType.LOG_REPLAY_TAG_TYPE);
1903 if (tmpTag != null) {
1904
1905 needAddRecoveryTag = false;
1906 }
1907 }
1908 if (needAddRecoveryTag) {
1909 List<Tag> newTags = new ArrayList<Tag>();
1910 Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
1911 .getLogSequenceNumber()));
1912 newTags.add(replayTag);
1913 return KeyValue.cloneAndAddTags(cell, newTags);
1914 }
1915 return cell;
1916 }
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1930 Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
1931
1932 if (entry == null) {
1933
1934 return new ArrayList<MutationReplay>();
1935 }
1936
1937 int count = entry.getAssociatedCellCount();
1938 List<MutationReplay> mutations = new ArrayList<MutationReplay>();
1939 Cell previousCell = null;
1940 Mutation m = null;
1941 HLogKey key = null;
1942 WALEdit val = null;
1943 if (logEntry != null) val = new WALEdit();
1944
1945 for (int i = 0; i < count; i++) {
1946
1947 if (!cells.advance()) {
1948 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1949 }
1950 Cell cell = cells.current();
1951 if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
1952
1953 boolean isNewRowOrType =
1954 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1955 || !CellUtil.matchingRow(previousCell, cell);
1956 if (isNewRowOrType) {
1957
1958 if (CellUtil.isDelete(cell)) {
1959 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1960
1961 mutations.add(new MutationReplay(
1962 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1963 } else {
1964 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1965
1966 long nonceGroup = entry.getKey().hasNonceGroup()
1967 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1968 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1969 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1970 }
1971 }
1972 if (CellUtil.isDelete(cell)) {
1973 ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
1974 } else {
1975 Cell tmpNewCell = cell;
1976 if (addLogReplayTag) {
1977 tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
1978 }
1979 ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
1980 }
1981 previousCell = cell;
1982 }
1983
1984
1985 if (logEntry != null) {
1986 WALKey walKey = entry.getKey();
1987 List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
1988 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1989 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1990 }
1991 key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
1992 .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
1993 clusterIds, walKey.getNonceGroup(), walKey.getNonce());
1994 logEntry.setFirst(key);
1995 logEntry.setSecond(val);
1996 }
1997
1998 return mutations;
1999 }
2000 }