View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.HBaseConfiguration;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HRegionLocation;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.KeyValueUtil;
65  import org.apache.hadoop.hbase.RemoteExceptionHandler;
66  import org.apache.hadoop.hbase.ServerName;
67  import org.apache.hadoop.hbase.TableName;
68  import org.apache.hadoop.hbase.TableNotFoundException;
69  import org.apache.hadoop.hbase.Tag;
70  import org.apache.hadoop.hbase.TagType;
71  import org.apache.hadoop.hbase.client.ConnectionUtils;
72  import org.apache.hadoop.hbase.client.Delete;
73  import org.apache.hadoop.hbase.client.HConnection;
74  import org.apache.hadoop.hbase.client.HConnectionManager;
75  import org.apache.hadoop.hbase.client.Mutation;
76  import org.apache.hadoop.hbase.client.Put;
77  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
78  import org.apache.hadoop.hbase.io.HeapSize;
79  import org.apache.hadoop.hbase.master.SplitLogManager;
80  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
81  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.RequestConverter;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
85  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
87  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
88  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
89  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
90  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
91  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
92  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
93  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
94  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
95  import org.apache.hadoop.hbase.regionserver.HRegion;
96  import org.apache.hadoop.hbase.regionserver.LastSequenceId;
97  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
98  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
99  import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.CancelableProgressable;
102 import org.apache.hadoop.hbase.util.ClassSize;
103 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
104 import org.apache.hadoop.hbase.util.FSUtils;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.Threads;
107 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
108 import org.apache.hadoop.hbase.zookeeper.ZKTable;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
110 import org.apache.hadoop.io.MultipleIOException;
111 import org.apache.zookeeper.KeeperException;
112 
113 import com.google.common.base.Preconditions;
114 import com.google.common.collect.Lists;
115 import com.google.protobuf.ServiceException;
116 
117 /**
118  * This class is responsible for splitting up a bunch of regionserver commit log
119  * files that are no longer being written to, into new files, one per region for
120  * region to replay on startup. Delete the old log files when finished.
121  */
122 @InterfaceAudience.Private
123 public class HLogSplitter {
124   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
125 
126   // Parameters for split process
127   protected final Path rootDir;
128   protected final FileSystem fs;
129   protected final Configuration conf;
130 
131   // Major subcomponents of the split process.
132   // These are separated into inner classes to make testing easier.
133   OutputSink outputSink;
134   EntryBuffers entryBuffers;
135 
136   private Set<TableName> disablingOrDisabledTables =
137       new HashSet<TableName>();
138   private ZooKeeperWatcher watcher;
139 
140   // If an exception is thrown by one of the other threads, it will be
141   // stored here.
142   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
143 
144   // Wait/notify for when data has been produced by the reader thread,
145   // consumed by the reader thread, or an exception occurred
146   final Object dataAvailable = new Object();
147 
148   private MonitoredTask status;
149 
150   // For checking the latest flushed sequence id
151   protected final LastSequenceId sequenceIdChecker;
152 
153   protected boolean distributedLogReplay;
154 
155   // Map encodedRegionName -> lastFlushedSequenceId
156   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
157 
158   // Map encodedRegionName -> maxSeqIdInStores
159   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
160       new ConcurrentHashMap<String, Map<byte[], Long>>();
161 
162   // Failed region server that the wal file being split belongs to
163   protected String failedServerName = "";
164 
165   // Number of writer threads
166   private final int numWriterThreads;
167 
168   // Min batch size when replay WAL edits
169   private final int minBatchSize;
170 
171   HLogSplitter(Configuration conf, Path rootDir,
172       FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, RecoveryMode mode) {
173     this.conf = HBaseConfiguration.create(conf);
174     String codecClassName = conf
175         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
176     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
177     this.rootDir = rootDir;
178     this.fs = fs;
179     this.sequenceIdChecker = idChecker;
180     this.watcher = zkw;
181 
182     entryBuffers = new EntryBuffers(
183         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
184             128*1024*1024));
185 
186     // a larger minBatchSize may slow down recovery because replay writer has to wait for
187     // enough edits before replaying them
188     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
189     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
190 
191     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
192     if (zkw != null && this.distributedLogReplay) {
193       outputSink = new LogReplayOutputSink(numWriterThreads);
194     } else {
195       if (this.distributedLogReplay) {
196         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
197       }
198       this.distributedLogReplay = false;
199       outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
200     }
201 
202   }
203 
204   /**
205    * Splits a HLog file into region's recovered-edits directory.
206    * This is the main entry point for distributed log splitting from SplitLogWorker.
207    * <p>
208    * If the log file has N regions then N recovered.edits files will be produced.
209    * <p>
210    * @param rootDir
211    * @param logfile
212    * @param fs
213    * @param conf
214    * @param reporter
215    * @param idChecker
216    * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
217    *          dump out recoved.edits files for regions to replay on.
218    * @return false if it is interrupted by the progress-able.
219    * @throws IOException
220    */
221   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
222       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
223       ZooKeeperWatcher zkw, RecoveryMode mode) throws IOException {
224     HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, mode);
225     return s.splitLogFile(logfile, reporter);
226   }
227 
228   // A wrapper to split one log folder using the method used by distributed
229   // log splitting. Used by tools and unit tests. It should be package private.
230   // It is public only because TestWALObserver is in a different package,
231   // which uses this method to to log splitting.
232   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
233       FileSystem fs, Configuration conf) throws IOException {
234     FileStatus[] logfiles = fs.listStatus(logDir);
235     List<Path> splits = new ArrayList<Path>();
236     if (logfiles != null && logfiles.length > 0) {
237       for (FileStatus logfile: logfiles) {
238         HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, 
239           RecoveryMode.LOG_SPLITTING);
240         if (s.splitLogFile(logfile, null)) {
241           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
242           if (s.outputSink.splits != null) {
243             splits.addAll(s.outputSink.splits);
244           }
245         }
246       }
247     }
248     if (!fs.delete(logDir, true)) {
249       throw new IOException("Unable to delete src dir: " + logDir);
250     }
251     return splits;
252   }
253 
254   // The real log splitter. It just splits one log file.
255   boolean splitLogFile(FileStatus logfile,
256       CancelableProgressable reporter) throws IOException {
257     boolean isCorrupted = false;
258     Preconditions.checkState(status == null);
259     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
260       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
261     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
262     Path logPath = logfile.getPath();
263     boolean outputSinkStarted = false;
264     boolean progress_failed = false;
265     int editsCount = 0;
266     int editsSkipped = 0;
267 
268     status =
269         TaskMonitor.get().createStatus(
270           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
271     try {
272       long logLength = logfile.getLen();
273       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
274       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
275       status.setStatus("Opening log file");
276       if (reporter != null && !reporter.progress()) {
277         progress_failed = true;
278         return false;
279       }
280       Reader in = null;
281       try {
282         in = getReader(fs, logfile, conf, skipErrors, reporter);
283       } catch (CorruptedLogFileException e) {
284         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
285         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
286         isCorrupted = true;
287       }
288       if (in == null) {
289         LOG.warn("Nothing to split in log file " + logPath);
290         return true;
291       }
292       if(watcher != null) {
293         try {
294           disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
295         } catch (KeeperException e) {
296           throw new IOException("Can't get disabling/disabled tables", e);
297         }
298       }
299       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
300       int numOpenedFilesLastCheck = 0;
301       outputSink.setReporter(reporter);
302       outputSink.startWriterThreads();
303       outputSinkStarted = true;
304       Entry entry;
305       Long lastFlushedSequenceId = -1L;
306       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
307       failedServerName = (serverName == null) ? "" : serverName.getServerName();
308       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
309         byte[] region = entry.getKey().getEncodedRegionName();
310         String key = Bytes.toString(region);
311         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
312         if (lastFlushedSequenceId == null) {
313           if (this.distributedLogReplay) {
314             RegionStoreSequenceIds ids =
315                 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
316             if (ids != null) {
317               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
318             }
319           } else if (sequenceIdChecker != null) {
320             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
321           }
322           if (lastFlushedSequenceId == null) {
323             lastFlushedSequenceId = -1L;
324           }
325           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
326         }
327         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
328           editsSkipped++;
329           continue;
330         }
331         entryBuffers.appendEntry(entry);
332         editsCount++;
333         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
334         // If sufficient edits have passed, check if we should report progress.
335         if (editsCount % interval == 0
336             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
337           numOpenedFilesLastCheck = this.getNumOpenWriters();
338           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
339               + " edits, skipped " + editsSkipped + " edits.";
340           status.setStatus("Split " + countsStr);
341           if (reporter != null && !reporter.progress()) {
342             progress_failed = true;
343             return false;
344           }
345         }
346       }
347     } catch (InterruptedException ie) {
348       IOException iie = new InterruptedIOException();
349       iie.initCause(ie);
350       throw iie;
351     } catch (CorruptedLogFileException e) {
352       LOG.warn("Could not parse, corrupted log file " + logPath, e);
353       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
354       isCorrupted = true;
355     } catch (IOException e) {
356       e = RemoteExceptionHandler.checkIOException(e);
357       throw e;
358     } finally {
359       LOG.debug("Finishing writing output logs and closing down.");
360       try {
361         if (outputSinkStarted) {
362           // Set progress_failed to true as the immediate following statement will reset its value
363           // when finishWritingAndClose() throws exception, progress_failed has the right value
364           progress_failed = true;
365           progress_failed = outputSink.finishWritingAndClose() == null;
366         }
367       } finally {
368         String msg =
369             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
370                 + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
371                 + " progress failed = " + progress_failed;
372         LOG.info(msg);
373         status.markComplete(msg);
374       }
375     }
376     return !progress_failed;
377   }
378 
379   /**
380    * Completes the work done by splitLogFile by archiving logs
381    * <p>
382    * It is invoked by SplitLogManager once it knows that one of the
383    * SplitLogWorkers have completed the splitLogFile() part. If the master
384    * crashes then this function might get called multiple times.
385    * <p>
386    * @param logfile
387    * @param conf
388    * @throws IOException
389    */
390   public static void finishSplitLogFile(String logfile,
391       Configuration conf)  throws IOException {
392     Path rootdir = FSUtils.getRootDir(conf);
393     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
394     Path logPath;
395     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
396       logPath = new Path(logfile);
397     } else {
398       logPath = new Path(rootdir, logfile);
399     }
400     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
401   }
402 
403   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
404       Path logPath, Configuration conf) throws IOException {
405     List<Path> processedLogs = new ArrayList<Path>();
406     List<Path> corruptedLogs = new ArrayList<Path>();
407     FileSystem fs;
408     fs = rootdir.getFileSystem(conf);
409     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
410       corruptedLogs.add(logPath);
411     } else {
412       processedLogs.add(logPath);
413     }
414     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
415     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
416     fs.delete(stagingDir, true);
417   }
418 
419   /**
420    * Moves processed logs to a oldLogDir after successful processing Moves
421    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
422    * (.corrupt) for later investigation
423    *
424    * @param corruptedLogs
425    * @param processedLogs
426    * @param oldLogDir
427    * @param fs
428    * @param conf
429    * @throws IOException
430    */
431   private static void archiveLogs(
432       final List<Path> corruptedLogs,
433       final List<Path> processedLogs, final Path oldLogDir,
434       final FileSystem fs, final Configuration conf) throws IOException {
435     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
436         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
437 
438     if (!fs.mkdirs(corruptDir)) {
439       LOG.info("Unable to mkdir " + corruptDir);
440     }
441     fs.mkdirs(oldLogDir);
442 
443     // this method can get restarted or called multiple times for archiving
444     // the same log files.
445     for (Path corrupted : corruptedLogs) {
446       Path p = new Path(corruptDir, corrupted.getName());
447       if (fs.exists(corrupted)) {
448         if (!fs.rename(corrupted, p)) {
449           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
450         } else {
451           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
452         }
453       }
454     }
455 
456     for (Path p : processedLogs) {
457       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
458       if (fs.exists(p)) {
459         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
460           LOG.warn("Unable to move  " + p + " to " + newPath);
461         } else {
462           LOG.debug("Archived processed log " + p + " to " + newPath);
463         }
464       }
465     }
466   }
467 
468   /**
469    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
470    * <code>logEntry</code> named for the sequenceid in the passed
471    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
472    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
473    * creating it if necessary.
474    * @param fs
475    * @param logEntry
476    * @param rootDir HBase root dir.
477    * @return Path to file into which to dump split log edits.
478    * @throws IOException
479    */
480   @SuppressWarnings("deprecation")
481   static Path getRegionSplitEditsPath(final FileSystem fs,
482       final Entry logEntry, final Path rootDir, boolean isCreate)
483   throws IOException {
484     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
485     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
486     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
487     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
488 
489     if (!fs.exists(regiondir)) {
490       LOG.info("This region's directory doesn't exist: "
491           + regiondir.toString() + ". It is very likely that it was" +
492           " already split so it's safe to discard those edits.");
493       return null;
494     }
495     if (fs.exists(dir) && fs.isFile(dir)) {
496       Path tmp = new Path("/tmp");
497       if (!fs.exists(tmp)) {
498         fs.mkdirs(tmp);
499       }
500       tmp = new Path(tmp,
501         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
502       LOG.warn("Found existing old file: " + dir + ". It could be some "
503         + "leftover of an old installation. It should be a folder instead. "
504         + "So moving it to " + tmp);
505       if (!fs.rename(dir, tmp)) {
506         LOG.warn("Failed to sideline old file " + dir);
507       }
508     }
509 
510     if (isCreate && !fs.exists(dir)) {
511       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
512     }
513     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
514     // region's replayRecoveredEdits will not delete it
515     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
516     fileName = getTmpRecoveredEditsFileName(fileName);
517     return new Path(dir, fileName);
518   }
519 
520   static String getTmpRecoveredEditsFileName(String fileName) {
521     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
522   }
523 
524   /**
525    * Get the completed recovered edits file path, renaming it to be by last edit
526    * in the file from its first edit. Then we could use the name to skip
527    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
528    * @param srcPath
529    * @param maximumEditLogSeqNum
530    * @return dstPath take file's last edit log seq num as the name
531    */
532   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
533       Long maximumEditLogSeqNum) {
534     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
535     return new Path(srcPath.getParent(), fileName);
536   }
537 
538   static String formatRecoveredEditsFileName(final long seqid) {
539     return String.format("%019d", seqid);
540   }
541 
542   /**
543    * Create a new {@link Reader} for reading logs to split.
544    *
545    * @param fs
546    * @param file
547    * @param conf
548    * @return A new Reader instance
549    * @throws IOException
550    * @throws CorruptedLogFileException
551    */
552   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
553       boolean skipErrors, CancelableProgressable reporter)
554       throws IOException, CorruptedLogFileException {
555     Path path = file.getPath();
556     long length = file.getLen();
557     Reader in;
558 
559     // Check for possibly empty file. With appends, currently Hadoop reports a
560     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
561     // HDFS-878 is committed.
562     if (length <= 0) {
563       LOG.warn("File " + path + " might be still open, length is 0");
564     }
565 
566     try {
567       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
568       try {
569         in = getReader(fs, path, conf, reporter);
570       } catch (EOFException e) {
571         if (length <= 0) {
572           // TODO should we ignore an empty, not-last log file if skip.errors
573           // is false? Either way, the caller should decide what to do. E.g.
574           // ignore if this is the last log in sequence.
575           // TODO is this scenario still possible if the log has been
576           // recovered (i.e. closed)
577           LOG.warn("Could not open " + path + " for reading. File is empty", e);
578           return null;
579         } else {
580           // EOFException being ignored
581           return null;
582         }
583       }
584     } catch (IOException e) {
585       if (e instanceof FileNotFoundException) {
586         // A wal file may not exist anymore. Nothing can be recovered so move on
587         LOG.warn("File " + path + " doesn't exist anymore.", e);
588         return null;
589       }
590       if (!skipErrors || e instanceof InterruptedIOException) {
591         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
592       }
593       CorruptedLogFileException t =
594         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
595             path + " ignoring");
596       t.initCause(e);
597       throw t;
598     }
599     return in;
600   }
601 
602   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
603   throws CorruptedLogFileException, IOException {
604     try {
605       return in.next();
606     } catch (EOFException eof) {
607       // truncated files are expected if a RS crashes (see HBASE-2643)
608       LOG.info("EOF from hlog " + path + ".  continuing");
609       return null;
610     } catch (IOException e) {
611       // If the IOE resulted from bad file format,
612       // then this problem is idempotent and retrying won't help
613       if (e.getCause() != null &&
614           (e.getCause() instanceof ParseException ||
615            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
616         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
617            + path + ".  continuing");
618         return null;
619       }
620       if (!skipErrors) {
621         throw e;
622       }
623       CorruptedLogFileException t =
624         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
625             " while parsing hlog " + path + ". Marking as corrupted");
626       t.initCause(e);
627       throw t;
628     }
629   }
630 
631   private void writerThreadError(Throwable t) {
632     thrown.compareAndSet(null, t);
633   }
634 
635   /**
636    * Check for errors in the writer threads. If any is found, rethrow it.
637    */
638   private void checkForErrors() throws IOException {
639     Throwable thrown = this.thrown.get();
640     if (thrown == null) return;
641     if (thrown instanceof IOException) {
642       throw new IOException(thrown);
643     } else {
644       throw new RuntimeException(thrown);
645     }
646   }
647   /**
648    * Create a new {@link Writer} for writing log splits.
649    */
650   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
651       throws IOException {
652     return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
653   }
654 
655   /**
656    * Create a new {@link Reader} for reading logs to split.
657    */
658   protected Reader getReader(FileSystem fs, Path curLogFile,
659       Configuration conf, CancelableProgressable reporter) throws IOException {
660     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
661   }
662 
663   /**
664    * Get current open writers
665    */
666   private int getNumOpenWriters() {
667     int result = 0;
668     if (this.outputSink != null) {
669       result += this.outputSink.getNumOpenWriters();
670     }
671     return result;
672   }
673 
674   /**
675    * Class which accumulates edits and separates them into a buffer per region
676    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
677    * a predefined threshold.
678    *
679    * Writer threads then pull region-specific buffers from this class.
680    */
681   class EntryBuffers {
682     Map<byte[], RegionEntryBuffer> buffers =
683       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
684 
685     /* Track which regions are currently in the middle of writing. We don't allow
686        an IO thread to pick up bytes from a region if we're already writing
687        data for that region in a different IO thread. */
688     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
689 
690     long totalBuffered = 0;
691     long maxHeapUsage;
692 
693     EntryBuffers(long maxHeapUsage) {
694       this.maxHeapUsage = maxHeapUsage;
695     }
696 
697     /**
698      * Append a log entry into the corresponding region buffer.
699      * Blocks if the total heap usage has crossed the specified threshold.
700      *
701      * @throws InterruptedException
702      * @throws IOException
703      */
704     void appendEntry(Entry entry) throws InterruptedException, IOException {
705       HLogKey key = entry.getKey();
706 
707       RegionEntryBuffer buffer;
708       long incrHeap;
709       synchronized (this) {
710         buffer = buffers.get(key.getEncodedRegionName());
711         if (buffer == null) {
712           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
713           buffers.put(key.getEncodedRegionName(), buffer);
714         }
715         incrHeap= buffer.appendEntry(entry);
716       }
717 
718       // If we crossed the chunk threshold, wait for more space to be available
719       synchronized (dataAvailable) {
720         totalBuffered += incrHeap;
721         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
722           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
723           dataAvailable.wait(2000);
724         }
725         dataAvailable.notifyAll();
726       }
727       checkForErrors();
728     }
729 
730     /**
731      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
732      */
733     synchronized RegionEntryBuffer getChunkToWrite() {
734       long biggestSize = 0;
735       byte[] biggestBufferKey = null;
736 
737       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
738         long size = entry.getValue().heapSize();
739         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
740           biggestSize = size;
741           biggestBufferKey = entry.getKey();
742         }
743       }
744       if (biggestBufferKey == null) {
745         return null;
746       }
747 
748       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
749       currentlyWriting.add(biggestBufferKey);
750       return buffer;
751     }
752 
753     void doneWriting(RegionEntryBuffer buffer) {
754       synchronized (this) {
755         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
756         assert removed;
757       }
758       long size = buffer.heapSize();
759 
760       synchronized (dataAvailable) {
761         totalBuffered -= size;
762         // We may unblock writers
763         dataAvailable.notifyAll();
764       }
765     }
766 
767     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
768       return currentlyWriting.contains(region);
769     }
770   }
771 
772   /**
773    * A buffer of some number of edits for a given region.
774    * This accumulates edits and also provides a memory optimization in order to
775    * share a single byte array instance for the table and region name.
776    * Also tracks memory usage of the accumulated edits.
777    */
778   static class RegionEntryBuffer implements HeapSize {
779     long heapInBuffer = 0;
780     List<Entry> entryBuffer;
781     TableName tableName;
782     byte[] encodedRegionName;
783 
784     RegionEntryBuffer(TableName tableName, byte[] region) {
785       this.tableName = tableName;
786       this.encodedRegionName = region;
787       this.entryBuffer = new LinkedList<Entry>();
788     }
789 
790     long appendEntry(Entry entry) {
791       internify(entry);
792       entryBuffer.add(entry);
793       long incrHeap = entry.getEdit().heapSize() +
794         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
795         0; // TODO linkedlist entry
796       heapInBuffer += incrHeap;
797       return incrHeap;
798     }
799 
800     private void internify(Entry entry) {
801       HLogKey k = entry.getKey();
802       k.internTableName(this.tableName);
803       k.internEncodedRegionName(this.encodedRegionName);
804     }
805 
806     public long heapSize() {
807       return heapInBuffer;
808     }
809   }
810 
811   class WriterThread extends Thread {
812     private volatile boolean shouldStop = false;
813     private OutputSink outputSink = null;
814 
815     WriterThread(OutputSink sink, int i) {
816       super(Thread.currentThread().getName() + "-Writer-" + i);
817       outputSink = sink;
818     }
819 
820     public void run()  {
821       try {
822         doRun();
823       } catch (Throwable t) {
824         LOG.error("Exiting thread", t);
825         writerThreadError(t);
826       }
827     }
828 
829     private void doRun() throws IOException {
830       LOG.debug("Writer thread " + this + ": starting");
831       while (true) {
832         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
833         if (buffer == null) {
834           // No data currently available, wait on some more to show up
835           synchronized (dataAvailable) {
836             if (shouldStop && !this.outputSink.flush()) {
837               return;
838             }
839             try {
840               dataAvailable.wait(500);
841             } catch (InterruptedException ie) {
842               if (!shouldStop) {
843                 throw new RuntimeException(ie);
844               }
845             }
846           }
847           continue;
848         }
849 
850         assert buffer != null;
851         try {
852           writeBuffer(buffer);
853         } finally {
854           entryBuffers.doneWriting(buffer);
855         }
856       }
857     }
858 
859     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
860       outputSink.append(buffer);
861     }
862 
863     void finish() {
864       synchronized (dataAvailable) {
865         shouldStop = true;
866         dataAvailable.notifyAll();
867       }
868     }
869   }
870 
871   /**
872    * The following class is an abstraction class to provide a common interface to support both
873    * existing recovered edits file sink and region server WAL edits replay sink
874    */
875    abstract class OutputSink {
876 
877     protected Map<byte[], SinkWriter> writers = Collections
878         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
879 
880     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
881         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
882 
883     protected final List<WriterThread> writerThreads = Lists.newArrayList();
884 
885     /* Set of regions which we've decided should not output edits */
886     protected final Set<byte[]> blacklistedRegions = Collections
887         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
888 
889     protected boolean closeAndCleanCompleted = false;
890 
891     protected boolean writersClosed = false;
892 
893     protected final int numThreads;
894 
895     protected CancelableProgressable reporter = null;
896 
897     protected AtomicLong skippedEdits = new AtomicLong();
898 
899     protected List<Path> splits = null;
900 
901     public OutputSink(int numWriters) {
902       numThreads = numWriters;
903     }
904 
905     void setReporter(CancelableProgressable reporter) {
906       this.reporter = reporter;
907     }
908 
909     /**
910      * Start the threads that will pump data from the entryBuffers to the output files.
911      */
912     synchronized void startWriterThreads() {
913       for (int i = 0; i < numThreads; i++) {
914         WriterThread t = new WriterThread(this, i);
915         t.start();
916         writerThreads.add(t);
917       }
918     }
919 
920     /**
921      *
922      * Update region's maximum edit log SeqNum.
923      */
924     void updateRegionMaximumEditLogSeqNum(Entry entry) {
925       synchronized (regionMaximumEditLogSeqNum) {
926         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
927             .getEncodedRegionName());
928         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
929           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
930               .getLogSeqNum());
931         }
932       }
933     }
934 
935     Long getRegionMaximumEditLogSeqNum(byte[] region) {
936       return regionMaximumEditLogSeqNum.get(region);
937     }
938 
939     /**
940      * @return the number of currently opened writers
941      */
942     int getNumOpenWriters() {
943       return this.writers.size();
944     }
945 
946     long getSkippedEdits() {
947       return this.skippedEdits.get();
948     }
949 
950     /**
951      * Wait for writer threads to dump all info to the sink
952      * @return true when there is no error
953      * @throws IOException
954      */
955     protected boolean finishWriting() throws IOException {
956       LOG.info("Waiting for split writer threads to finish");
957       boolean progress_failed = false;
958       for (WriterThread t : writerThreads) {
959         t.finish();
960       }
961       for (WriterThread t : writerThreads) {
962         if (!progress_failed && reporter != null && !reporter.progress()) {
963           progress_failed = true;
964         }
965         try {
966           t.join();
967         } catch (InterruptedException ie) {
968           IOException iie = new InterruptedIOException();
969           iie.initCause(ie);
970           throw iie;
971         }
972       }
973       checkForErrors();
974       LOG.info("Split writers finished");
975       return (!progress_failed);
976     }
977 
978     abstract List<Path> finishWritingAndClose() throws IOException;
979 
980     /**
981      * @return a map from encoded region ID to the number of edits written out for that region.
982      */
983     abstract Map<byte[], Long> getOutputCounts();
984 
985     /**
986      * @return number of regions we've recovered
987      */
988     abstract int getNumberOfRecoveredRegions();
989 
990     /**
991      * @param buffer A WAL Edit Entry
992      * @throws IOException
993      */
994     abstract void append(RegionEntryBuffer buffer) throws IOException;
995 
996     /**
997      * WriterThread call this function to help flush internal remaining edits in buffer before close
998      * @return true when underlying sink has something to flush
999      */
1000     protected boolean flush() throws IOException {
1001       return false;
1002     }
1003   }
1004 
1005   /**
1006    * Class that manages the output streams from the log splitting process.
1007    */
1008   class LogRecoveredEditsOutputSink extends OutputSink {
1009 
1010     public LogRecoveredEditsOutputSink(int numWriters) {
1011       // More threads could potentially write faster at the expense
1012       // of causing more disk seeks as the logs are split.
1013       // 3. After a certain setting (probably around 3) the
1014       // process will be bound on the reader in the current
1015       // implementation anyway.
1016       super(numWriters);
1017     }
1018 
1019     /**
1020      * @return null if failed to report progress
1021      * @throws IOException
1022      */
1023     @Override
1024     List<Path> finishWritingAndClose() throws IOException {
1025       boolean isSuccessful = false;
1026       List<Path> result = null;
1027       try {
1028         isSuccessful = finishWriting();
1029       } finally {
1030         result = close();
1031         List<IOException> thrown = closeLogWriters(null);
1032         if (thrown != null && !thrown.isEmpty()) {
1033           throw MultipleIOException.createIOException(thrown);
1034         }
1035       }
1036       if (isSuccessful) {
1037         splits = result;
1038       }
1039       return splits;
1040     }
1041 
1042     /**
1043      * Close all of the output streams.
1044      * @return the list of paths written.
1045      */
1046     private List<Path> close() throws IOException {
1047       Preconditions.checkState(!closeAndCleanCompleted);
1048 
1049       final List<Path> paths = new ArrayList<Path>();
1050       final List<IOException> thrown = Lists.newArrayList();
1051       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1052         TimeUnit.SECONDS, new ThreadFactory() {
1053           private int count = 1;
1054 
1055           public Thread newThread(Runnable r) {
1056             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1057             return t;
1058           }
1059         });
1060       CompletionService<Void> completionService =
1061         new ExecutorCompletionService<Void>(closeThreadPool);
1062       for (final Map.Entry<byte[], ? extends SinkWriter> writersEntry : writers.entrySet()) {
1063         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1064         completionService.submit(new Callable<Void>() {
1065           public Void call() throws Exception {
1066             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1067             LOG.debug("Closing " + wap.p);
1068             try {
1069               wap.w.close();
1070             } catch (IOException ioe) {
1071               LOG.error("Couldn't close log at " + wap.p, ioe);
1072               thrown.add(ioe);
1073               return null;
1074             }
1075             LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1076                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1077 
1078             if (wap.editsWritten == 0) {
1079               // just remove the empty recovered.edits file
1080               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1081                 LOG.warn("Failed deleting empty " + wap.p);
1082                 throw new IOException("Failed deleting empty  " + wap.p);
1083               }
1084               return null;
1085             }
1086 
1087             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1088               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1089             try {
1090               if (!dst.equals(wap.p) && fs.exists(dst)) {
1091                 LOG.warn("Found existing old edits file. It could be the "
1092                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1093                     + fs.getFileStatus(dst).getLen());
1094                 if (!fs.delete(dst, false)) {
1095                   LOG.warn("Failed deleting of old " + dst);
1096                   throw new IOException("Failed deleting of old " + dst);
1097                 }
1098               }
1099               // Skip the unit tests which create a splitter that reads and
1100               // writes the data without touching disk.
1101               // TestHLogSplit#testThreading is an example.
1102               if (fs.exists(wap.p)) {
1103                 if (!fs.rename(wap.p, dst)) {
1104                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1105                 }
1106                 LOG.debug("Rename " + wap.p + " to " + dst);
1107               }
1108             } catch (IOException ioe) {
1109               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1110               thrown.add(ioe);
1111               return null;
1112             }
1113             paths.add(dst);
1114             return null;
1115           }
1116         });
1117       }
1118 
1119       boolean progress_failed = false;
1120       try {
1121         for (int i = 0, n = this.writers.size(); i < n; i++) {
1122           Future<Void> future = completionService.take();
1123           future.get();
1124           if (!progress_failed && reporter != null && !reporter.progress()) {
1125             progress_failed = true;
1126           }
1127         }
1128       } catch (InterruptedException e) {
1129         IOException iie = new InterruptedIOException();
1130         iie.initCause(e);
1131         throw iie;
1132       } catch (ExecutionException e) {
1133         throw new IOException(e.getCause());
1134       } finally {
1135         closeThreadPool.shutdownNow();
1136       }
1137 
1138       if (!thrown.isEmpty()) {
1139         throw MultipleIOException.createIOException(thrown);
1140       }
1141       writersClosed = true;
1142       closeAndCleanCompleted = true;
1143       if (progress_failed) {
1144         return null;
1145       }
1146       return paths;
1147     }
1148 
1149     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1150       if (writersClosed) {
1151         return thrown;
1152       }
1153 
1154       if (thrown == null) {
1155         thrown = Lists.newArrayList();
1156       }
1157       try {
1158         for (WriterThread t : writerThreads) {
1159           while (t.isAlive()) {
1160             t.shouldStop = true;
1161             t.interrupt();
1162             try {
1163               t.join(10);
1164             } catch (InterruptedException e) {
1165               IOException iie = new InterruptedIOException();
1166               iie.initCause(e);
1167               throw iie;
1168             }
1169           }
1170         }
1171       } finally {
1172         synchronized (writers) {
1173           WriterAndPath wap = null;
1174           for (SinkWriter tmpWAP : writers.values()) {
1175             try {
1176               wap = (WriterAndPath) tmpWAP;
1177               wap.w.close();
1178             } catch (IOException ioe) {
1179               LOG.error("Couldn't close log at " + wap.p, ioe);
1180               thrown.add(ioe);
1181               continue;
1182             }
1183             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1184                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1185           }
1186         }
1187         writersClosed = true;
1188       }
1189 
1190       return thrown;
1191     }
1192 
1193     /**
1194      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1195      * long as multiple threads are always acting on different regions.
1196      * @return null if this region shouldn't output any logs
1197      */
1198     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1199       byte region[] = entry.getKey().getEncodedRegionName();
1200       WriterAndPath ret = (WriterAndPath) writers.get(region);
1201       if (ret != null) {
1202         return ret;
1203       }
1204       // If we already decided that this region doesn't get any output
1205       // we don't need to check again.
1206       if (blacklistedRegions.contains(region)) {
1207         return null;
1208       }
1209       ret = createWAP(region, entry, rootDir, fs, conf);
1210       if (ret == null) {
1211         blacklistedRegions.add(region);
1212         return null;
1213       }
1214       writers.put(region, ret);
1215       return ret;
1216     }
1217 
1218     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1219         Configuration conf) throws IOException {
1220       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1221       if (regionedits == null) {
1222         return null;
1223       }
1224       if (fs.exists(regionedits)) {
1225         LOG.warn("Found old edits file. It could be the "
1226             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1227             + fs.getFileStatus(regionedits).getLen());
1228         if (!fs.delete(regionedits, false)) {
1229           LOG.warn("Failed delete of old " + regionedits);
1230         }
1231       }
1232       Writer w = createWriter(fs, regionedits, conf);
1233       LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1234       return (new WriterAndPath(regionedits, w));
1235     }
1236 
1237     void append(RegionEntryBuffer buffer) throws IOException {
1238       List<Entry> entries = buffer.entryBuffer;
1239       if (entries.isEmpty()) {
1240         LOG.warn("got an empty buffer, skipping");
1241         return;
1242       }
1243 
1244       WriterAndPath wap = null;
1245 
1246       long startTime = System.nanoTime();
1247       try {
1248         int editsCount = 0;
1249 
1250         for (Entry logEntry : entries) {
1251           if (wap == null) {
1252             wap = getWriterAndPath(logEntry);
1253             if (wap == null) {
1254               // getWriterAndPath decided we don't need to write these edits
1255               return;
1256             }
1257           }
1258           wap.w.append(logEntry);
1259           this.updateRegionMaximumEditLogSeqNum(logEntry);
1260           editsCount++;
1261         }
1262         // Pass along summary statistics
1263         wap.incrementEdits(editsCount);
1264         wap.incrementNanoTime(System.nanoTime() - startTime);
1265       } catch (IOException e) {
1266         e = RemoteExceptionHandler.checkIOException(e);
1267         LOG.fatal(" Got while writing log entry to log", e);
1268         throw e;
1269       }
1270     }
1271 
1272     /**
1273      * @return a map from encoded region ID to the number of edits written out for that region.
1274      */
1275     Map<byte[], Long> getOutputCounts() {
1276       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1277       synchronized (writers) {
1278         for (Map.Entry<byte[], ? extends SinkWriter> entry : writers.entrySet()) {
1279           ret.put(entry.getKey(), entry.getValue().editsWritten);
1280         }
1281       }
1282       return ret;
1283     }
1284 
1285     @Override
1286     int getNumberOfRecoveredRegions() {
1287       return writers.size();
1288     }
1289   }
1290 
1291   /**
1292    * Class wraps the actual writer which writes data out and related statistics
1293    */
1294   private abstract static class SinkWriter {
1295     /* Count of edits written to this path */
1296     long editsWritten = 0;
1297     /* Number of nanos spent writing to this log */
1298     long nanosSpent = 0;
1299 
1300     void incrementEdits(int edits) {
1301       editsWritten += edits;
1302     }
1303 
1304     void incrementNanoTime(long nanos) {
1305       nanosSpent += nanos;
1306     }
1307   }
1308 
1309   /**
1310    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1311    * data written to this output.
1312    */
1313   private final static class WriterAndPath extends SinkWriter {
1314     final Path p;
1315     final Writer w;
1316 
1317     WriterAndPath(final Path p, final Writer w) {
1318       this.p = p;
1319       this.w = w;
1320     }
1321   }
1322 
1323   /**
1324    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1325    */
1326   class LogReplayOutputSink extends OutputSink {
1327     private static final double BUFFER_THRESHOLD = 0.35;
1328     private static final String KEY_DELIMITER = "#";
1329 
1330     private long waitRegionOnlineTimeOut;
1331     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1332     private final Map<String, RegionServerWriter> writers =
1333         new ConcurrentHashMap<String, RegionServerWriter>();
1334     // online encoded region name -> region location map
1335     private final Map<String, HRegionLocation> onlineRegions =
1336         new ConcurrentHashMap<String, HRegionLocation>();
1337 
1338     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1339         .synchronizedMap(new TreeMap<TableName, HConnection>());
1340     /**
1341      * Map key -> value layout
1342      * <servername>:<table name> -> Queue<Row>
1343      */
1344     private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1345         new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1346     private List<Throwable> thrown = new ArrayList<Throwable>();
1347 
1348     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1349     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1350     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1351     // won't be assigned by AM. We can retire this code after HBASE-8234.
1352     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1353     private boolean hasEditsInDisablingOrDisabledTables = false;
1354 
1355     public LogReplayOutputSink(int numWriters) {
1356       super(numWriters);
1357       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1358         SplitLogManager.DEFAULT_TIMEOUT);
1359       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1360       this.logRecoveredEditsOutputSink.setReporter(reporter);
1361     }
1362 
1363     void append(RegionEntryBuffer buffer) throws IOException {
1364       List<Entry> entries = buffer.entryBuffer;
1365       if (entries.isEmpty()) {
1366         LOG.warn("got an empty buffer, skipping");
1367         return;
1368       }
1369 
1370       // check if current region in a disabling or disabled table
1371       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1372         // need fall back to old way
1373         logRecoveredEditsOutputSink.append(buffer);
1374         hasEditsInDisablingOrDisabledTables = true;
1375         // store regions we have recovered so far
1376         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1377         return;
1378       }
1379 
1380       // group entries by region servers
1381       groupEditsByServer(entries);
1382 
1383       // process workitems
1384       String maxLocKey = null;
1385       int maxSize = 0;
1386       List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1387       synchronized (this.serverToBufferQueueMap) {
1388         for (String key : this.serverToBufferQueueMap.keySet()) {
1389           List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1390           if (curQueue.size() > maxSize) {
1391             maxSize = curQueue.size();
1392             maxQueue = curQueue;
1393             maxLocKey = key;
1394           }
1395         }
1396         if (maxSize < minBatchSize
1397             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1398           // buffer more to process
1399           return;
1400         } else if (maxSize > 0) {
1401           this.serverToBufferQueueMap.remove(maxLocKey);
1402         }
1403       }
1404 
1405       if (maxSize > 0) {
1406         processWorkItems(maxLocKey, maxQueue);
1407       }
1408     }
1409 
1410     private void addToRecoveredRegions(String encodedRegionName) {
1411       if (!recoveredRegions.contains(encodedRegionName)) {
1412         recoveredRegions.add(encodedRegionName);
1413       }
1414     }
1415 
1416     /**
1417      * Helper function to group WALEntries to individual region servers
1418      * @throws IOException
1419      */
1420     private void groupEditsByServer(List<Entry> entries) throws IOException {
1421       Set<TableName> nonExistentTables = null;
1422       Long cachedLastFlushedSequenceId = -1l;
1423       for (HLog.Entry entry : entries) {
1424         WALEdit edit = entry.getEdit();
1425         TableName table = entry.getKey().getTablename();
1426         // clear scopes which isn't needed for recovery
1427         entry.getKey().setScopes(null);
1428         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1429         // skip edits of non-existent tables
1430         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1431           this.skippedEdits.incrementAndGet();
1432           continue;
1433         }
1434 
1435         Map<byte[], Long> maxStoreSequenceIds = null;
1436         boolean needSkip = false;
1437         HRegionLocation loc = null;
1438         String locKey = null;
1439         List<KeyValue> kvs = edit.getKeyValues();
1440         List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
1441         HConnection hconn = this.getConnectionByTableName(table);
1442 
1443         for (KeyValue kv : kvs) {
1444           byte[] row = kv.getRow();
1445           byte[] family = kv.getFamily();
1446           boolean isCompactionEntry = false;
1447 	
1448           if (kv.matchingFamily(WALEdit.METAFAMILY)) {
1449             CompactionDescriptor compaction = WALEdit.getCompaction(kv);
1450             if (compaction != null && compaction.hasRegionName()) {
1451               try {
1452                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1453                   .toByteArray());
1454                 row = regionName[1]; // startKey of the region
1455                 family = compaction.getFamilyName().toByteArray();
1456                 isCompactionEntry = true;
1457               } catch (Exception ex) {
1458                 LOG.warn("Unexpected exception received, ignoring " + ex);
1459                 skippedKVs.add(kv);
1460                 continue;
1461               }
1462             } else {
1463               skippedKVs.add(kv);
1464               continue;
1465             }
1466           }
1467 
1468           try {
1469             loc =
1470                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1471                   encodeRegionNameStr);
1472             // skip replaying the compaction if the region is gone
1473             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1474               loc.getRegionInfo().getEncodedName())) {
1475               LOG.info("Not replaying a compaction marker for an older region: "
1476                   + encodeRegionNameStr);
1477               needSkip = true;
1478             }
1479           } catch (TableNotFoundException ex) {
1480             // table has been deleted so skip edits of the table
1481             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1482                 + encodeRegionNameStr);
1483             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1484             if (nonExistentTables == null) {
1485               nonExistentTables = new TreeSet<TableName>();
1486             }
1487             nonExistentTables.add(table);
1488             this.skippedEdits.incrementAndGet();
1489             needSkip = true;
1490             break;
1491           }
1492 
1493           cachedLastFlushedSequenceId =
1494               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1495           if (cachedLastFlushedSequenceId != null
1496               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1497             // skip the whole HLog entry
1498             this.skippedEdits.incrementAndGet();
1499             needSkip = true;
1500             break;
1501           } else {
1502             if (maxStoreSequenceIds == null) {
1503               maxStoreSequenceIds =
1504                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1505             }
1506             if (maxStoreSequenceIds != null) {
1507               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1508               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1509                 // skip current kv if column family doesn't exist anymore or already flushed
1510                 skippedKVs.add(kv);
1511                 continue;
1512               }
1513             }
1514           }
1515         }
1516 
1517         // skip the edit
1518         if (loc == null || needSkip) continue;
1519 
1520         if (!skippedKVs.isEmpty()) {
1521           kvs.removeAll(skippedKVs);
1522         }
1523 
1524         synchronized (serverToBufferQueueMap) {
1525           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1526           List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1527           if (queue == null) {
1528             queue =
1529                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1530             serverToBufferQueueMap.put(locKey, queue);
1531           }
1532           queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1533         }
1534         // store regions we have recovered so far
1535         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1536       }
1537     }
1538 
1539     /**
1540      * Locate destination region based on table name & row. This function also makes sure the
1541      * destination region is online for replay.
1542      * @throws IOException
1543      */
1544     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1545         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1546       // fetch location from cache
1547       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1548       if(loc != null) return loc;
1549       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1550       loc = hconn.getRegionLocation(table, row, true);
1551       if (loc == null) {
1552         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1553             + " of table:" + table);
1554       }
1555       // check if current row moves to a different region due to region merge/split
1556       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1557         // originalEncodedRegionName should have already flushed
1558         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1559         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1560         if (tmpLoc != null) return tmpLoc;
1561       }
1562 
1563       Long lastFlushedSequenceId = -1l;
1564       AtomicBoolean isRecovering = new AtomicBoolean(true);
1565       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1566       if (!isRecovering.get()) {
1567         // region isn't in recovering at all because WAL file may contain a region that has
1568         // been moved to somewhere before hosting RS fails
1569         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1570         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1571             + " because it's not in recovering.");
1572       } else {
1573         Long cachedLastFlushedSequenceId =
1574             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1575 
1576         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1577         // update the value for the region
1578         RegionStoreSequenceIds ids =
1579             SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
1580                 .getRegionInfo().getEncodedName());
1581         if (ids != null) {
1582           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1583           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1584           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1585           for (StoreSequenceId id : maxSeqIdInStores) {
1586             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1587           }
1588           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1589         }
1590 
1591         if (cachedLastFlushedSequenceId == null
1592             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1593           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1594         }
1595       }
1596 
1597       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1598       return loc;
1599     }
1600 
1601     private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1602         throws IOException {
1603       RegionServerWriter rsw = null;
1604 
1605       long startTime = System.nanoTime();
1606       try {
1607         rsw = getRegionServerWriter(key);
1608         rsw.sink.replayEntries(actions);
1609 
1610         // Pass along summary statistics
1611         rsw.incrementEdits(actions.size());
1612         rsw.incrementNanoTime(System.nanoTime() - startTime);
1613       } catch (IOException e) {
1614         e = RemoteExceptionHandler.checkIOException(e);
1615         LOG.fatal(" Got while writing log entry to log", e);
1616         throw e;
1617       }
1618     }
1619 
1620     /**
1621      * Wait until region is online on the destination region server
1622      * @param loc
1623      * @param row
1624      * @param timeout How long to wait
1625      * @param isRecovering Recovering state of the region interested on destination region server.
1626      * @return True when region is online on the destination region server
1627      * @throws InterruptedException
1628      */
1629     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1630         final long timeout, AtomicBoolean isRecovering)
1631         throws IOException {
1632       final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1633       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1634         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1635       boolean reloadLocation = false;
1636       TableName tableName = loc.getRegionInfo().getTable();
1637       int tries = 0;
1638       Throwable cause = null;
1639       while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1640         try {
1641           // Try and get regioninfo from the hosting server.
1642           HConnection hconn = getConnectionByTableName(tableName);
1643           if(reloadLocation) {
1644             loc = hconn.getRegionLocation(tableName, row, true);
1645           }
1646           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1647           HRegionInfo region = loc.getRegionInfo();
1648           try {
1649             GetRegionInfoRequest request =
1650                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1651             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1652             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1653               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1654               return loc;
1655             }
1656           } catch (ServiceException se) {
1657             throw ProtobufUtil.getRemoteException(se);
1658           }
1659         } catch (IOException e) {
1660           cause = e.getCause();
1661           if(!(cause instanceof RegionOpeningException)) {
1662             reloadLocation = true;
1663           }
1664         }
1665         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1666         try {
1667           Thread.sleep(expectedSleep);
1668         } catch (InterruptedException e) {
1669           throw new IOException("Interrupted when waiting region " +
1670               loc.getRegionInfo().getEncodedName() + " online.", e);
1671         }
1672         tries++;
1673       }
1674 
1675       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1676         " online for " + timeout + " milliseconds.", cause);
1677     }
1678 
1679     @Override
1680     protected boolean flush() throws IOException {
1681       String curLoc = null;
1682       int curSize = 0;
1683       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1684       synchronized (this.serverToBufferQueueMap) {
1685         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1686           curQueue = this.serverToBufferQueueMap.get(locationKey);
1687           if (!curQueue.isEmpty()) {
1688             curSize = curQueue.size();
1689             curLoc = locationKey;
1690             break;
1691           }
1692         }
1693         if (curSize > 0) {
1694           this.serverToBufferQueueMap.remove(curLoc);
1695         }
1696       }
1697 
1698       if (curSize > 0) {
1699         this.processWorkItems(curLoc, curQueue);
1700         dataAvailable.notifyAll();
1701         return true;
1702       }
1703       return false;
1704     }
1705 
1706     void addWriterError(Throwable t) {
1707       thrown.add(t);
1708     }
1709 
1710     @Override
1711     List<Path> finishWritingAndClose() throws IOException {
1712       try {
1713         if (!finishWriting()) {
1714           return null;
1715         }
1716         if (hasEditsInDisablingOrDisabledTables) {
1717           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1718         } else {
1719           splits = new ArrayList<Path>();
1720         }
1721         // returns an empty array in order to keep interface same as old way
1722         return splits;
1723       } finally {
1724         List<IOException> thrown = closeRegionServerWriters();
1725         if (thrown != null && !thrown.isEmpty()) {
1726           throw MultipleIOException.createIOException(thrown);
1727         }
1728       }
1729     }
1730 
1731     @Override
1732     int getNumOpenWriters() {
1733       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1734     }
1735 
1736     private List<IOException> closeRegionServerWriters() throws IOException {
1737       List<IOException> result = null;
1738       if (!writersClosed) {
1739         result = Lists.newArrayList();
1740         try {
1741           for (WriterThread t : writerThreads) {
1742             while (t.isAlive()) {
1743               t.shouldStop = true;
1744               t.interrupt();
1745               try {
1746                 t.join(10);
1747               } catch (InterruptedException e) {
1748                 IOException iie = new InterruptedIOException();
1749                 iie.initCause(e);
1750                 throw iie;
1751               }
1752             }
1753           }
1754         } finally {
1755           synchronized (writers) {
1756             for (String locationKey : writers.keySet()) {
1757               RegionServerWriter tmpW = writers.get(locationKey);
1758               try {
1759                 tmpW.close();
1760               } catch (IOException ioe) {
1761                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1762                 result.add(ioe);
1763               }
1764             }
1765           }
1766 
1767           // close connections
1768           synchronized (this.tableNameToHConnectionMap) {
1769             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1770               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1771               try {
1772                 hconn.clearRegionCache();
1773                 hconn.close();
1774               } catch (IOException ioe) {
1775                 result.add(ioe);
1776               }
1777             }
1778           }
1779           writersClosed = true;
1780         }
1781       }
1782       return result;
1783     }
1784 
1785     Map<byte[], Long> getOutputCounts() {
1786       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1787       synchronized (writers) {
1788         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1789           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1790         }
1791       }
1792       return ret;
1793     }
1794 
1795     @Override
1796     int getNumberOfRecoveredRegions() {
1797       return this.recoveredRegions.size();
1798     }
1799 
1800     /**
1801      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1802      * long as multiple threads are always acting on different regions.
1803      * @return null if this region shouldn't output any logs
1804      */
1805     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1806       RegionServerWriter ret = writers.get(loc);
1807       if (ret != null) {
1808         return ret;
1809       }
1810 
1811       TableName tableName = getTableFromLocationStr(loc);
1812       if(tableName == null){
1813         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1814       }
1815 
1816       HConnection hconn = getConnectionByTableName(tableName);
1817       synchronized (writers) {
1818         ret = writers.get(loc);
1819         if (ret == null) {
1820           ret = new RegionServerWriter(conf, tableName, hconn);
1821           writers.put(loc, ret);
1822         }
1823       }
1824       return ret;
1825     }
1826 
1827     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1828       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1829       if (hconn == null) {
1830         synchronized (this.tableNameToHConnectionMap) {
1831           hconn = this.tableNameToHConnectionMap.get(tableName);
1832           if (hconn == null) {
1833             hconn = HConnectionManager.getConnection(conf);
1834             this.tableNameToHConnectionMap.put(tableName, hconn);
1835           }
1836         }
1837       }
1838       return hconn;
1839     }
1840     private TableName getTableFromLocationStr(String loc) {
1841       /**
1842        * location key is in format <server name:port>#<table name>
1843        */
1844       String[] splits = loc.split(KEY_DELIMITER);
1845       if (splits.length != 2) {
1846         return null;
1847       }
1848       return TableName.valueOf(splits[1]);
1849     }
1850   }
1851 
1852   /**
1853    * Private data structure that wraps a receiving RS and collecting statistics about the data
1854    * written to this newly assigned RS.
1855    */
1856   private final static class RegionServerWriter extends SinkWriter {
1857     final WALEditsReplaySink sink;
1858 
1859     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1860         throws IOException {
1861       this.sink = new WALEditsReplaySink(conf, tableName, conn);
1862     }
1863 
1864     void close() throws IOException {
1865     }
1866   }
1867 
1868   static class CorruptedLogFileException extends Exception {
1869     private static final long serialVersionUID = 1L;
1870 
1871     CorruptedLogFileException(String s) {
1872       super(s);
1873     }
1874   }
1875 
1876   /** A struct used by getMutationsFromWALEntry */
1877   public static class MutationReplay {
1878     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1879       this.type = type;
1880       this.mutation = mutation;
1881       this.nonceGroup = nonceGroup;
1882       this.nonce = nonce;
1883     }
1884 
1885     public final MutationType type;
1886     public final Mutation mutation;
1887     public final long nonceGroup;
1888     public final long nonce;
1889   }
1890 
1891  /**
1892   * Tag original sequence number for each edit to be replayed
1893   * @param entry
1894   * @param cell
1895   * @return
1896   */
1897   private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
1898     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
1899     boolean needAddRecoveryTag = true;
1900     if (cell.getTagsLengthUnsigned() > 0) {
1901       Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(),
1902           cell.getTagsLengthUnsigned(), TagType.LOG_REPLAY_TAG_TYPE);
1903       if (tmpTag != null) {
1904         // found an existing log replay tag so reuse it
1905         needAddRecoveryTag = false;
1906       }
1907     }
1908     if (needAddRecoveryTag) {
1909       List<Tag> newTags = new ArrayList<Tag>();
1910       Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
1911           .getLogSequenceNumber()));
1912       newTags.add(replayTag);
1913       return KeyValue.cloneAndAddTags(cell, newTags);
1914     }
1915     return cell;
1916   }
1917 
1918   /**
1919    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
1920    * WALEdit from the passed in WALEntry
1921    * @param entry
1922    * @param cells
1923    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
1924    *          extracted from the passed in WALEntry.
1925    * @param addLogReplayTag
1926    * @return list of Pair<MutationType, Mutation> to be replayed
1927    * @throws IOException
1928    */
1929   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1930       Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
1931 
1932     if (entry == null) {
1933       // return an empty array
1934       return new ArrayList<MutationReplay>();
1935     }
1936 
1937     int count = entry.getAssociatedCellCount();
1938     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
1939     Cell previousCell = null;
1940     Mutation m = null;
1941     HLogKey key = null;
1942     WALEdit val = null;
1943     if (logEntry != null) val = new WALEdit();
1944 
1945     for (int i = 0; i < count; i++) {
1946       // Throw index out of bounds if our cell count is off
1947       if (!cells.advance()) {
1948         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1949       }
1950       Cell cell = cells.current();
1951       if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
1952 
1953       boolean isNewRowOrType =
1954           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1955               || !CellUtil.matchingRow(previousCell, cell);
1956       if (isNewRowOrType) {
1957         // Create new mutation
1958         if (CellUtil.isDelete(cell)) {
1959           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1960           // Deletes don't have nonces.
1961           mutations.add(new MutationReplay(
1962               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1963         } else {
1964           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1965           // Puts might come from increment or append, thus we need nonces.
1966           long nonceGroup = entry.getKey().hasNonceGroup()
1967               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1968           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1969           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1970         }
1971       }
1972       if (CellUtil.isDelete(cell)) {
1973         ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
1974       } else {
1975         Cell tmpNewCell = cell;
1976         if (addLogReplayTag) {
1977           tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
1978         }
1979         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
1980       }
1981       previousCell = cell;
1982     }
1983 
1984     // reconstruct HLogKey
1985     if (logEntry != null) {
1986       WALKey walKey = entry.getKey();
1987       List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
1988       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1989         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1990       }
1991       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
1992               .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
1993               clusterIds, walKey.getNonceGroup(), walKey.getNonce());
1994       logEntry.setFirst(key);
1995       logEntry.setSecond(val);
1996     }
1997 
1998     return mutations;
1999   }
2000 }