1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.net.URLEncoder;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.NavigableMap;
35 import java.util.TreeMap;
36 import java.util.UUID;
37 import java.util.concurrent.ConcurrentSkipListMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41 import java.util.concurrent.locks.ReentrantLock;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FSDataOutputStream;
48 import org.apache.hadoop.fs.FileStatus;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.Syncable;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HTableDescriptor;
56 import org.apache.hadoop.hbase.KeyValue;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.ClassSize;
60 import org.apache.hadoop.hbase.util.DrainBarrier;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.HasThread;
64 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
65 import org.apache.hadoop.util.StringUtils;
66 import org.cloudera.htrace.Trace;
67 import org.cloudera.htrace.TraceScope;
68
69 import com.google.common.annotations.VisibleForTesting;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 @InterfaceAudience.Private
111 class FSHLog implements HLog, Syncable {
112 static final Log LOG = LogFactory.getLog(FSHLog.class);
113
114 private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100;
115
116 private final FileSystem fs;
117 private final Path rootDir;
118 private final Path dir;
119 private final Configuration conf;
120
121 private List<WALActionsListener> listeners =
122 new CopyOnWriteArrayList<WALActionsListener>();
123 private final long blocksize;
124 private final String prefix;
125 private final AtomicLong unflushedEntries = new AtomicLong(0);
126 private final AtomicLong syncedTillHere = new AtomicLong(0);
127 private long lastUnSyncedTxid;
128 private final Path oldLogDir;
129
130
131
132 private final AtomicLong failedTxid = new AtomicLong(-1);
133 private volatile IOException asyncIOE = null;
134
135 private WALCoprocessorHost coprocessorHost;
136
137 private FSDataOutputStream hdfs_out;
138
139
140 private int minTolerableReplication;
141 private Method getNumCurrentReplicas;
142 private final Method getPipeLine;
143 private final int slowSyncNs;
144
145 final static Object [] NO_ARGS = new Object []{};
146
147
148 private DrainBarrier closeBarrier = new DrainBarrier();
149
150
151
152
153 Writer writer;
154
155
156
157
158
159
160 private final Object oldestSeqNumsLock = new Object();
161
162
163
164
165
166 private final ReentrantLock rollWriterLock = new ReentrantLock(true);
167
168
169
170
171 private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
172 new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
173
174
175
176
177
178 private final Map<byte[], Long> oldestFlushingSeqNums =
179 new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
180
181 private volatile boolean closed = false;
182
183 private boolean forMeta = false;
184
185
186 private volatile long filenum = -1;
187
188
189 private final AtomicInteger numEntries = new AtomicInteger(0);
190
191
192
193
194
195 private AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
196 private final int lowReplicationRollLimit;
197
198
199
200
201 private volatile boolean lowReplicationRollEnabled = true;
202
203
204
205 private final long logrollsize;
206
207
208 private long curLogSize = 0;
209
210
211
212
213 private AtomicLong totalLogSize = new AtomicLong(0);
214
215
216
217
218 private final Object updateLock = new Object();
219 private final Object pendingWritesLock = new Object();
220
221 private final boolean enabled;
222
223
224
225
226
227
228 private final int maxLogs;
229
230
231
232
233
234
235 private List<Entry> pendingWrites = new LinkedList<Entry>();
236
237 private final AsyncWriter asyncWriter;
238
239
240
241
242 private final AsyncSyncer[] asyncSyncers;
243 private final AsyncNotifier asyncNotifier;
244
245
246 private final int closeErrorsTolerated;
247
248 private final AtomicInteger closeErrorCount = new AtomicInteger();
249 private final MetricsWAL metrics;
250
251
252
253
254
255
256
257
258
259
260
261
262 private Map<byte[], Long> latestSequenceNums = new HashMap<byte[], Long>();
263
264
265
266
267 public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
268 @Override
269 public int compare(Path o1, Path o2) {
270 long t1 = getFileNumFromFileName(o1);
271 long t2 = getFileNumFromFileName(o2);
272 if (t1 == t2) return 0;
273 return (t1 > t2) ? 1 : -1;
274 }
275 };
276
277
278
279
280
281 private NavigableMap<Path, Map<byte[], Long>> hlogSequenceNums =
282 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
283
284
285
286
287
288
289
290
291
292
293 public FSHLog(final FileSystem fs, final Path root, final String logDir,
294 final Configuration conf)
295 throws IOException {
296 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
297 conf, null, true, null, false);
298 }
299
300
301
302
303
304
305
306
307
308
309
310 public FSHLog(final FileSystem fs, final Path root, final String logDir,
311 final String oldLogDir, final Configuration conf)
312 throws IOException {
313 this(fs, root, logDir, oldLogDir,
314 conf, null, true, null, false);
315 }
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 public FSHLog(final FileSystem fs, final Path root, final String logDir,
337 final Configuration conf, final List<WALActionsListener> listeners,
338 final String prefix) throws IOException {
339 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
340 conf, listeners, true, prefix, false);
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365 public FSHLog(final FileSystem fs, final Path root, final String logDir,
366 final String oldLogDir, final Configuration conf,
367 final List<WALActionsListener> listeners,
368 final boolean failIfLogDirExists, final String prefix, boolean forMeta)
369 throws IOException {
370 super();
371 this.fs = fs;
372 this.rootDir = root;
373 this.dir = new Path(this.rootDir, logDir);
374 this.oldLogDir = new Path(this.rootDir, oldLogDir);
375 this.forMeta = forMeta;
376 this.conf = conf;
377
378 if (listeners != null) {
379 for (WALActionsListener i: listeners) {
380 registerWALActionsListener(i);
381 }
382 }
383
384 this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
385 FSUtils.getDefaultBlockSize(this.fs, this.dir));
386
387 float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
388 this.logrollsize = (long)(this.blocksize * multi);
389
390 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
391 this.minTolerableReplication = conf.getInt(
392 "hbase.regionserver.hlog.tolerable.lowreplication",
393 FSUtils.getDefaultReplication(fs, this.dir));
394 this.lowReplicationRollLimit = conf.getInt(
395 "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
396 this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
397 this.closeErrorsTolerated = conf.getInt(
398 "hbase.regionserver.logroll.errors.tolerated", 0);
399
400
401 LOG.info("WAL/HLog configuration: blocksize=" +
402 StringUtils.byteDesc(this.blocksize) +
403 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
404 ", enabled=" + this.enabled);
405
406 this.prefix = prefix == null || prefix.isEmpty() ?
407 "hlog" : URLEncoder.encode(prefix, "UTF8");
408
409 boolean dirExists = false;
410 if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
411 throw new IOException("Target HLog directory already exists: " + dir);
412 }
413 if (!dirExists && !fs.mkdirs(dir)) {
414 throw new IOException("Unable to mkdir " + dir);
415 }
416
417 if (!fs.exists(this.oldLogDir)) {
418 if (!fs.mkdirs(this.oldLogDir)) {
419 throw new IOException("Unable to mkdir " + this.oldLogDir);
420 }
421 }
422
423 rollWriter();
424
425 this.slowSyncNs =
426 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
427 DEFAULT_SLOW_SYNC_TIME_MS);
428
429 this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
430 this.getPipeLine = getGetPipeline(this.hdfs_out);
431
432 final String n = Thread.currentThread().getName();
433
434
435 asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
436 asyncWriter.start();
437
438 int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
439 asyncSyncers = new AsyncSyncer[syncerNums];
440 for (int i = 0; i < asyncSyncers.length; ++i) {
441 asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
442 asyncSyncers[i].start();
443 }
444
445 asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
446 asyncNotifier.start();
447
448 coprocessorHost = new WALCoprocessorHost(this, conf);
449
450 this.metrics = new MetricsWAL();
451 registerWALActionsListener(metrics);
452 }
453
454
455
456
457
458 private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
459 Method m = null;
460 if (os != null) {
461 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
462 .getClass();
463 try {
464 m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
465 new Class<?>[] {});
466 m.setAccessible(true);
467 } catch (NoSuchMethodException e) {
468 LOG.info("FileSystem's output stream doesn't support"
469 + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
470 + wrappedStreamClass.getName());
471 } catch (SecurityException e) {
472 LOG.info("Doesn't have access to getNumCurrentReplicas on "
473 + "FileSystems's output stream --HDFS-826 not available; fsOut="
474 + wrappedStreamClass.getName(), e);
475 m = null;
476 }
477 }
478 if (m != null) {
479 if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826");
480 }
481 return m;
482 }
483
484 @Override
485 public void registerWALActionsListener(final WALActionsListener listener) {
486 this.listeners.add(listener);
487 }
488
489 @Override
490 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
491 return this.listeners.remove(listener);
492 }
493
494 @Override
495 public long getFilenum() {
496 return this.filenum;
497 }
498
499
500
501
502
503
504
505
506
507 OutputStream getOutputStream() {
508 return this.hdfs_out.getWrappedStream();
509 }
510
511 @Override
512 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
513 return rollWriter(false);
514 }
515
516 @Override
517 public byte [][] rollWriter(boolean force)
518 throws FailedLogCloseException, IOException {
519 rollWriterLock.lock();
520 try {
521
522 if (!force && this.writer != null && this.numEntries.get() <= 0) {
523 return null;
524 }
525 byte [][] regionsToFlush = null;
526 if (closed) {
527 LOG.debug("HLog closed. Skipping rolling of writer");
528 return null;
529 }
530 try {
531 if (!closeBarrier.beginOp()) {
532 LOG.debug("HLog closing. Skipping rolling of writer");
533 return regionsToFlush;
534 }
535
536
537 long currentFilenum = this.filenum;
538 Path oldPath = null;
539 if (currentFilenum > 0) {
540
541 oldPath = computeFilename(currentFilenum);
542 }
543 this.filenum = System.currentTimeMillis();
544 Path newPath = computeFilename();
545 while (fs.exists(newPath)) {
546 this.filenum++;
547 newPath = computeFilename();
548 }
549
550
551 if (!this.listeners.isEmpty()) {
552 for (WALActionsListener i : this.listeners) {
553 i.preLogRoll(oldPath, newPath);
554 }
555 }
556 FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
557
558 FSDataOutputStream nextHdfsOut = null;
559 if (nextWriter instanceof ProtobufLogWriter) {
560 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
561
562 try {
563 nextWriter.sync();
564 } catch (IOException e) {
565
566 LOG.warn("pre-sync failed", e);
567 }
568 }
569
570 Path oldFile = null;
571 int oldNumEntries = 0;
572 synchronized (updateLock) {
573
574 oldNumEntries = this.numEntries.get();
575 oldFile = cleanupCurrentWriter(currentFilenum);
576 this.writer = nextWriter;
577 this.hdfs_out = nextHdfsOut;
578 this.numEntries.set(0);
579 if (oldFile != null) {
580 this.hlogSequenceNums.put(oldFile, this.latestSequenceNums);
581 this.latestSequenceNums = new HashMap<byte[], Long>();
582 }
583 }
584 if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
585 else {
586 long oldFileLen = this.fs.getFileStatus(oldFile).getLen();
587 this.totalLogSize.addAndGet(oldFileLen);
588 LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries="
589 + oldNumEntries + ", filesize="
590 + StringUtils.humanReadableInt(oldFileLen) + "; new WAL "
591 + FSUtils.getPath(newPath));
592 }
593
594
595 if (!this.listeners.isEmpty()) {
596 for (WALActionsListener i : this.listeners) {
597 i.postLogRoll(oldPath, newPath);
598 }
599 }
600
601
602 if (getNumRolledLogFiles() > 0) {
603 cleanOldLogs();
604 regionsToFlush = findRegionsToForceFlush();
605 }
606 } finally {
607 closeBarrier.endOp();
608 }
609 return regionsToFlush;
610 } finally {
611 rollWriterLock.unlock();
612 }
613 }
614
615
616
617
618
619
620
621
622
623
624
625 protected Writer createWriterInstance(final FileSystem fs, final Path path,
626 final Configuration conf) throws IOException {
627 if (forMeta) {
628
629 }
630 return HLogFactory.createWALWriter(fs, path, conf);
631 }
632
633
634
635
636
637
638
639
640
641
642
643
644 private void cleanOldLogs() throws IOException {
645 Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
646 Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
647 List<Path> logsToArchive = new ArrayList<Path>();
648
649 synchronized (oldestSeqNumsLock) {
650 oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.oldestFlushingSeqNums);
651 oldestUnflushedSeqNumsLocal = new HashMap<byte[], Long>(this.oldestUnflushedSeqNums);
652 }
653 for (Map.Entry<Path, Map<byte[], Long>> e : hlogSequenceNums.entrySet()) {
654
655 Path log = e.getKey();
656 Map<byte[], Long> sequenceNums = e.getValue();
657
658 if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
659 oldestUnflushedSeqNumsLocal)) {
660 logsToArchive.add(log);
661 LOG.debug("log file is ready for archiving " + log);
662 }
663 }
664 for (Path p : logsToArchive) {
665 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
666 archiveLogFile(p);
667 this.hlogSequenceNums.remove(p);
668 }
669 }
670
671
672
673
674
675
676
677
678
679
680
681
682 static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
683 Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
684 for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
685
686
687 long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
688 oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
689 long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
690 oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
691
692 long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
693 if (minSeqNum <= regionSeqIdEntry.getValue()) return false;
694 }
695 return true;
696 }
697
698
699
700
701
702
703
704
705
706
707
708 private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
709 List<byte[]> regionsToFlush = null;
710
711 synchronized (oldestSeqNumsLock) {
712 for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
713 Long unFlushedVal = this.oldestUnflushedSeqNums.get(e.getKey());
714 if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
715 if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
716 regionsToFlush.add(e.getKey());
717 }
718 }
719 }
720 return regionsToFlush == null ? null : regionsToFlush
721 .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
722 }
723
724
725
726
727
728
729
730
731 byte[][] findRegionsToForceFlush() throws IOException {
732 byte [][] regions = null;
733 int logCount = getNumRolledLogFiles();
734 if (logCount > this.maxLogs && logCount > 0) {
735 Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
736 this.hlogSequenceNums.firstEntry();
737 regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
738 }
739 if (regions != null) {
740 StringBuilder sb = new StringBuilder();
741 for (int i = 0; i < regions.length; i++) {
742 if (i > 0) sb.append(", ");
743 sb.append(Bytes.toStringBinary(regions[i]));
744 }
745 LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
746 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
747 sb.toString());
748 }
749 return regions;
750 }
751
752
753
754
755
756
757
758 Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
759 Path oldFile = null;
760 if (this.writer != null) {
761
762 try {
763
764
765 if (this.unflushedEntries.get() != this.syncedTillHere.get()) {
766 LOG.debug("cleanupCurrentWriter " +
767 " waiting for transactions to get synced " +
768 " total " + this.unflushedEntries.get() +
769 " synced till here " + this.syncedTillHere.get());
770 sync();
771 }
772 this.writer.close();
773 this.writer = null;
774 closeErrorCount.set(0);
775 } catch (IOException e) {
776 LOG.error("Failed close of HLog writer", e);
777 int errors = closeErrorCount.incrementAndGet();
778 if (errors <= closeErrorsTolerated && !hasUnSyncedEntries()) {
779 LOG.warn("Riding over HLog close failure! error count="+errors);
780 } else {
781 if (hasUnSyncedEntries()) {
782 LOG.error("Aborting due to unflushed edits in HLog");
783 }
784
785
786
787 FailedLogCloseException flce =
788 new FailedLogCloseException("#" + currentfilenum);
789 flce.initCause(e);
790 throw flce;
791 }
792 }
793 if (currentfilenum >= 0) {
794 oldFile = computeFilename(currentfilenum);
795 }
796 }
797 return oldFile;
798 }
799
800 private void archiveLogFile(final Path p) throws IOException {
801 Path newPath = getHLogArchivePath(this.oldLogDir, p);
802
803 if (!this.listeners.isEmpty()) {
804 for (WALActionsListener i : this.listeners) {
805 i.preLogArchive(p, newPath);
806 }
807 }
808 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
809 throw new IOException("Unable to rename " + p + " to " + newPath);
810 }
811
812 if (!this.listeners.isEmpty()) {
813 for (WALActionsListener i : this.listeners) {
814 i.postLogArchive(p, newPath);
815 }
816 }
817 }
818
819
820
821
822
823
824 protected Path computeFilename() {
825 return computeFilename(this.filenum);
826 }
827
828
829
830
831
832
833
834 protected Path computeFilename(long filenum) {
835 if (filenum < 0) {
836 throw new RuntimeException("hlog file number can't be < 0");
837 }
838 String child = prefix + "." + filenum;
839 if (forMeta) {
840 child += HLog.META_HLOG_FILE_EXTN;
841 }
842 return new Path(dir, child);
843 }
844
845
846
847
848
849
850
851
852
853 protected long getFileNumFromFileName(Path fileName) {
854 if (fileName == null) throw new IllegalArgumentException("file name can't be null");
855
856 String prefixPathStr = new Path(dir, prefix + ".").toString();
857 if (!fileName.toString().startsWith(prefixPathStr)) {
858 throw new IllegalArgumentException("The log file " + fileName + " doesn't belong to" +
859 " this regionserver " + prefixPathStr);
860 }
861 String chompedPath = fileName.toString().substring(prefixPathStr.length());
862 if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN));
863 return Long.parseLong(chompedPath);
864 }
865
866 @Override
867 public void closeAndDelete() throws IOException {
868 close();
869 if (!fs.exists(this.dir)) return;
870 FileStatus[] files = fs.listStatus(this.dir);
871 if (files != null) {
872 for(FileStatus file : files) {
873
874 Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
875
876 if (!this.listeners.isEmpty()) {
877 for (WALActionsListener i : this.listeners) {
878 i.preLogArchive(file.getPath(), p);
879 }
880 }
881
882 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
883 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
884 }
885
886 if (!this.listeners.isEmpty()) {
887 for (WALActionsListener i : this.listeners) {
888 i.postLogArchive(file.getPath(), p);
889 }
890 }
891 }
892 LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.oldLogDir));
893 }
894 if (!fs.delete(dir, true)) {
895 LOG.info("Unable to delete " + dir);
896 }
897 }
898
899 @Override
900 public void close() throws IOException {
901 if (this.closed) {
902 return;
903 }
904
905 try {
906 asyncNotifier.interrupt();
907 asyncNotifier.join();
908 } catch (InterruptedException e) {
909 LOG.error("Exception while waiting for " + asyncNotifier.getName() +
910 " threads to die", e);
911 }
912
913 for (int i = 0; i < asyncSyncers.length; ++i) {
914 try {
915 asyncSyncers[i].interrupt();
916 asyncSyncers[i].join();
917 } catch (InterruptedException e) {
918 LOG.error("Exception while waiting for " + asyncSyncers[i].getName() +
919 " threads to die", e);
920 }
921 }
922
923 try {
924 asyncWriter.interrupt();
925 asyncWriter.join();
926 } catch (InterruptedException e) {
927 LOG.error("Exception while waiting for " + asyncWriter.getName() +
928 " thread to die", e);
929 }
930
931 try {
932
933 closeBarrier.stopAndDrainOps();
934 } catch (InterruptedException e) {
935 LOG.error("Exception while waiting for cache flushes and log rolls", e);
936 Thread.currentThread().interrupt();
937 }
938
939
940 if (!this.listeners.isEmpty()) {
941 for (WALActionsListener i : this.listeners) {
942 i.logCloseRequested();
943 }
944 }
945 synchronized (updateLock) {
946 this.closed = true;
947 if (LOG.isDebugEnabled()) {
948 LOG.debug("Closing WAL writer in " + this.dir.toString());
949 }
950 if (this.writer != null) {
951 this.writer.close();
952 this.writer = null;
953 }
954 }
955 }
956
957
958
959
960
961
962
963
964
965 protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
966 long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
967 return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
968 }
969
970 @Override
971 @VisibleForTesting
972 public void append(HRegionInfo info, TableName tableName, WALEdit edits,
973 final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException {
974 append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, true, sequenceId,
975 HConstants.NO_NONCE, HConstants.NO_NONCE);
976 }
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005 @SuppressWarnings("deprecation")
1006 private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
1007 final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore,
1008 AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
1009 if (edits.isEmpty()) return this.unflushedEntries.get();
1010 if (this.closed) {
1011 throw new IOException("Cannot append; log is closed");
1012 }
1013 TraceScope traceScope = Trace.startSpan("FSHlog.append");
1014 try {
1015 long txid = 0;
1016 synchronized (this.updateLock) {
1017
1018
1019 long seqNum = sequenceId.incrementAndGet();
1020
1021
1022
1023
1024
1025
1026
1027 byte [] encodedRegionName = info.getEncodedNameAsBytes();
1028 if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
1029 HLogKey logKey = makeKey(
1030 encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
1031
1032 synchronized (pendingWritesLock) {
1033 doWrite(info, logKey, edits, htd);
1034 txid = this.unflushedEntries.incrementAndGet();
1035 }
1036 this.numEntries.incrementAndGet();
1037 this.asyncWriter.setPendingTxid(txid);
1038
1039 if (htd.isDeferredLogFlush()) {
1040 lastUnSyncedTxid = txid;
1041 }
1042 this.latestSequenceNums.put(encodedRegionName, seqNum);
1043 }
1044
1045
1046
1047
1048 if (doSync &&
1049 (info.isMetaRegion() ||
1050 !htd.isDeferredLogFlush())) {
1051
1052 this.sync(txid);
1053 }
1054 return txid;
1055 } finally {
1056 traceScope.close();
1057 }
1058 }
1059
1060 @Override
1061 public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
1062 List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
1063 boolean isInMemstore, long nonceGroup, long nonce) throws IOException {
1064 return append(info, tableName, edits, clusterIds,
1065 now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce);
1066 }
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 private class AsyncWriter extends HasThread {
1089 private long pendingTxid = 0;
1090 private long txidToWrite = 0;
1091 private long lastWrittenTxid = 0;
1092 private Object writeLock = new Object();
1093
1094 public AsyncWriter(String name) {
1095 super(name);
1096 }
1097
1098
1099
1100 public void setPendingTxid(long txid) {
1101 synchronized (this.writeLock) {
1102 if (txid <= this.pendingTxid)
1103 return;
1104
1105 this.pendingTxid = txid;
1106 this.writeLock.notify();
1107 }
1108 }
1109
1110 public void run() {
1111 try {
1112 while (!this.isInterrupted()) {
1113
1114 synchronized (this.writeLock) {
1115 while (this.pendingTxid <= this.lastWrittenTxid) {
1116 this.writeLock.wait();
1117 }
1118 }
1119
1120
1121
1122
1123
1124
1125
1126
1127 List<Entry> pendWrites = null;
1128 synchronized (pendingWritesLock) {
1129 this.txidToWrite = unflushedEntries.get();
1130 pendWrites = pendingWrites;
1131 pendingWrites = new LinkedList<Entry>();
1132 }
1133
1134
1135 try {
1136 for (Entry e : pendWrites) {
1137 writer.append(e);
1138 }
1139 } catch(IOException e) {
1140 LOG.error("Error while AsyncWriter write, request close of hlog ", e);
1141 requestLogRoll();
1142
1143 asyncIOE = e;
1144 failedTxid.set(this.txidToWrite);
1145 }
1146
1147
1148 this.lastWrittenTxid = this.txidToWrite;
1149 boolean hasIdleSyncer = false;
1150 for (int i = 0; i < asyncSyncers.length; ++i) {
1151 if (!asyncSyncers[i].isSyncing()) {
1152 hasIdleSyncer = true;
1153 asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
1154 break;
1155 }
1156 }
1157 if (!hasIdleSyncer) {
1158 int idx = (int)(this.lastWrittenTxid % asyncSyncers.length);
1159 asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
1160 }
1161 }
1162 } catch (InterruptedException e) {
1163 LOG.debug(getName() + " interrupted while waiting for " +
1164 "newer writes added to local buffer");
1165 } catch (Exception e) {
1166 LOG.error("UNEXPECTED", e);
1167 } finally {
1168 LOG.info(getName() + " exiting");
1169 }
1170 }
1171 }
1172
1173
1174
1175 private class AsyncSyncer extends HasThread {
1176 private long writtenTxid = 0;
1177 private long txidToSync = 0;
1178 private long lastSyncedTxid = 0;
1179 private volatile boolean isSyncing = false;
1180 private Object syncLock = new Object();
1181
1182 public AsyncSyncer(String name) {
1183 super(name);
1184 }
1185
1186 public boolean isSyncing() {
1187 return this.isSyncing;
1188 }
1189
1190
1191
1192 public void setWrittenTxid(long txid) {
1193 synchronized (this.syncLock) {
1194 if (txid <= this.writtenTxid)
1195 return;
1196
1197 this.writtenTxid = txid;
1198 this.syncLock.notify();
1199 }
1200 }
1201
1202 public void run() {
1203 try {
1204 while (!this.isInterrupted()) {
1205
1206
1207 synchronized (this.syncLock) {
1208 while (this.writtenTxid <= this.lastSyncedTxid) {
1209 this.syncLock.wait();
1210 }
1211 this.txidToSync = this.writtenTxid;
1212 }
1213
1214
1215
1216
1217
1218 if (this.txidToSync <= syncedTillHere.get()) {
1219 this.lastSyncedTxid = this.txidToSync;
1220 continue;
1221 }
1222
1223
1224 long now = EnvironmentEdgeManager.currentTimeMillis();
1225 try {
1226 if (writer == null) {
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245 LOG.fatal("should never happen: has unsynced writes but writer is null!");
1246 asyncIOE = new IOException("has unsynced writes but writer is null!");
1247 failedTxid.set(this.txidToSync);
1248 } else {
1249 this.isSyncing = true;
1250 writer.sync();
1251 this.isSyncing = false;
1252 }
1253 postSync();
1254 } catch (IOException e) {
1255 LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
1256 requestLogRoll();
1257
1258 asyncIOE = e;
1259 failedTxid.set(this.txidToSync);
1260
1261 this.isSyncing = false;
1262 }
1263 final long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1264 metrics.finishSync(took);
1265 if (took > (slowSyncNs/1000000)) {
1266 String msg =
1267 new StringBuilder().append("Slow sync cost: ")
1268 .append(took).append(" ms, current pipeline: ")
1269 .append(Arrays.toString(getPipeLine())).toString();
1270 Trace.addTimelineAnnotation(msg);
1271 LOG.info(msg);
1272 }
1273
1274
1275
1276 this.lastSyncedTxid = this.txidToSync;
1277 asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
1278
1279
1280 boolean lowReplication = false;
1281 if (rollWriterLock.tryLock()) {
1282 try {
1283 lowReplication = checkLowReplication();
1284 } finally {
1285 rollWriterLock.unlock();
1286 }
1287 try {
1288 if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1289 requestLogRoll(lowReplication);
1290 }
1291 } catch (IOException e) {
1292 LOG.warn("writer.getLength() failed,this failure won't block here");
1293 }
1294 }
1295 }
1296 } catch (InterruptedException e) {
1297 LOG.debug(getName() + " interrupted while waiting for " +
1298 "notification from AsyncWriter thread");
1299 } catch (Exception e) {
1300 LOG.error("UNEXPECTED", e);
1301 } finally {
1302 LOG.info(getName() + " exiting");
1303 }
1304 }
1305 }
1306
1307
1308
1309
1310
1311
1312
1313 private class AsyncNotifier extends HasThread {
1314 private long flushedTxid = 0;
1315 private long lastNotifiedTxid = 0;
1316 private Object notifyLock = new Object();
1317
1318 public AsyncNotifier(String name) {
1319 super(name);
1320 }
1321
1322 public void setFlushedTxid(long txid) {
1323 synchronized (this.notifyLock) {
1324 if (txid <= this.flushedTxid) {
1325 return;
1326 }
1327
1328 this.flushedTxid = txid;
1329 this.notifyLock.notify();
1330 }
1331 }
1332
1333 public void run() {
1334 try {
1335 while (!this.isInterrupted()) {
1336 synchronized (this.notifyLock) {
1337 while (this.flushedTxid <= this.lastNotifiedTxid) {
1338 this.notifyLock.wait();
1339 }
1340 this.lastNotifiedTxid = this.flushedTxid;
1341 }
1342
1343
1344
1345 synchronized (syncedTillHere) {
1346 syncedTillHere.set(this.lastNotifiedTxid);
1347 syncedTillHere.notifyAll();
1348 }
1349 }
1350 } catch (InterruptedException e) {
1351 LOG.debug(getName() + " interrupted while waiting for " +
1352 " notification from AsyncSyncer thread");
1353 } catch (Exception e) {
1354 LOG.error("UNEXPECTED", e);
1355 } finally {
1356 LOG.info(getName() + " exiting");
1357 }
1358 }
1359 }
1360
1361
1362 private void syncer() throws IOException {
1363 syncer(this.unflushedEntries.get());
1364 }
1365
1366
1367 private void syncer(long txid) throws IOException {
1368 synchronized (this.syncedTillHere) {
1369 while (this.syncedTillHere.get() < txid) {
1370 try {
1371 this.syncedTillHere.wait();
1372 } catch (InterruptedException e) {
1373 LOG.debug("interrupted while waiting for notification from AsyncNotifier");
1374 }
1375 }
1376 }
1377 if (txid <= this.failedTxid.get()) {
1378 assert asyncIOE != null :
1379 "current txid is among(under) failed txids, but asyncIOE is null!";
1380 throw asyncIOE;
1381 }
1382 }
1383
1384 @Override
1385 public void postSync() {}
1386
1387 @Override
1388 public void postAppend(List<Entry> entries) {}
1389
1390
1391
1392
1393 private boolean checkLowReplication() {
1394 boolean logRollNeeded = false;
1395
1396
1397 try {
1398 int numCurrentReplicas = getLogReplication();
1399 if (numCurrentReplicas != 0
1400 && numCurrentReplicas < this.minTolerableReplication) {
1401 if (this.lowReplicationRollEnabled) {
1402 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1403 LOG.warn("HDFS pipeline error detected. " + "Found "
1404 + numCurrentReplicas + " replicas but expecting no less than "
1405 + this.minTolerableReplication + " replicas. "
1406 + " Requesting close of hlog. current pipeline: "
1407 + Arrays.toString(getPipeLine()));
1408 logRollNeeded = true;
1409
1410
1411
1412 this.consecutiveLogRolls.getAndIncrement();
1413 } else {
1414 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1415 + "the total number of live datanodes is lower than the tolerable replicas.");
1416 this.consecutiveLogRolls.set(0);
1417 this.lowReplicationRollEnabled = false;
1418 }
1419 }
1420 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1421
1422 if (!this.lowReplicationRollEnabled) {
1423
1424
1425
1426 if (this.numEntries.get() <= 1) {
1427 return logRollNeeded;
1428 }
1429
1430
1431 this.lowReplicationRollEnabled = true;
1432 LOG.info("LowReplication-Roller was enabled.");
1433 }
1434 }
1435 } catch (Exception e) {
1436 LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1437 " still proceeding ahead...");
1438 }
1439 return logRollNeeded;
1440 }
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454 int getLogReplication()
1455 throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1456 if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1457 Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1458 if (repl instanceof Integer) {
1459 return ((Integer)repl).intValue();
1460 }
1461 }
1462 return 0;
1463 }
1464
1465 boolean canGetCurReplicas() {
1466 return this.getNumCurrentReplicas != null;
1467 }
1468
1469 @Override
1470 public void hsync() throws IOException {
1471 syncer();
1472 }
1473
1474 @Override
1475 public void hflush() throws IOException {
1476 syncer();
1477 }
1478
1479 @Override
1480 public void sync() throws IOException {
1481 syncer();
1482 }
1483
1484 @Override
1485 public void sync(long txid) throws IOException {
1486 syncer(txid);
1487 }
1488
1489 private void requestLogRoll() {
1490 requestLogRoll(false);
1491 }
1492
1493 private void requestLogRoll(boolean tooFewReplicas) {
1494 if (!this.listeners.isEmpty()) {
1495 for (WALActionsListener i: this.listeners) {
1496 i.logRollRequested(tooFewReplicas);
1497 }
1498 }
1499 }
1500
1501
1502 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1503 HTableDescriptor htd)
1504 throws IOException {
1505 if (!this.enabled) {
1506 return;
1507 }
1508 if (!this.listeners.isEmpty()) {
1509 for (WALActionsListener i: this.listeners) {
1510 i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1511 }
1512 }
1513 try {
1514 long now = EnvironmentEdgeManager.currentTimeMillis();
1515
1516 if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1517 if (logEdit.isReplay()) {
1518
1519 logKey.setScopes(null);
1520 }
1521
1522 this.pendingWrites.add(new HLog.Entry(logKey, logEdit));
1523 }
1524 long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1525 coprocessorHost.postWALWrite(info, logKey, logEdit);
1526 long len = 0;
1527 for (KeyValue kv : logEdit.getKeyValues()) {
1528 len += kv.getLength();
1529 }
1530 this.metrics.finishAppend(took, len);
1531 } catch (IOException e) {
1532 LOG.fatal("Could not append. Requesting close of hlog", e);
1533 requestLogRoll();
1534 throw e;
1535 }
1536 }
1537
1538
1539
1540 int getNumEntries() {
1541 return numEntries.get();
1542 }
1543
1544
1545 public int getNumRolledLogFiles() {
1546 return hlogSequenceNums.size();
1547 }
1548
1549
1550 @Override
1551 public int getNumLogFiles() {
1552
1553 return getNumRolledLogFiles() + 1;
1554 }
1555
1556
1557 @Override
1558 public long getLogFileSize() {
1559 return totalLogSize.get() + curLogSize;
1560 }
1561
1562 @Override
1563 public boolean startCacheFlush(final byte[] encodedRegionName) {
1564 Long oldRegionSeqNum = null;
1565 if (!closeBarrier.beginOp()) {
1566 LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1567 " - because the server is closing.");
1568 return false;
1569 }
1570 synchronized (oldestSeqNumsLock) {
1571 oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
1572 if (oldRegionSeqNum != null) {
1573 Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
1574 assert oldValue == null : "Flushing map not cleaned up for "
1575 + Bytes.toString(encodedRegionName);
1576 }
1577 }
1578 if (oldRegionSeqNum == null) {
1579
1580
1581
1582
1583
1584 LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1585 + Bytes.toString(encodedRegionName) + "]");
1586 }
1587 return true;
1588 }
1589
1590 @Override
1591 public void completeCacheFlush(final byte [] encodedRegionName)
1592 {
1593 synchronized (oldestSeqNumsLock) {
1594 this.oldestFlushingSeqNums.remove(encodedRegionName);
1595 }
1596 closeBarrier.endOp();
1597 }
1598
1599 @Override
1600 public void abortCacheFlush(byte[] encodedRegionName) {
1601 Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
1602 synchronized (oldestSeqNumsLock) {
1603 seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
1604 if (seqNumBeforeFlushStarts != null) {
1605 currentSeqNum =
1606 this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
1607 }
1608 }
1609 closeBarrier.endOp();
1610 if ((currentSeqNum != null)
1611 && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
1612 String errorStr = "Region " + Bytes.toString(encodedRegionName) +
1613 "acquired edits out of order current memstore seq=" + currentSeqNum
1614 + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
1615 LOG.error(errorStr);
1616 assert false : errorStr;
1617 Runtime.getRuntime().halt(1);
1618 }
1619 }
1620
1621 @Override
1622 public boolean isLowReplicationRollEnabled() {
1623 return lowReplicationRollEnabled;
1624 }
1625
1626
1627
1628
1629
1630
1631 protected Path getDir() {
1632 return dir;
1633 }
1634
1635 static Path getHLogArchivePath(Path oldLogDir, Path p) {
1636 return new Path(oldLogDir, p.getName());
1637 }
1638
1639 static String formatRecoveredEditsFileName(final long seqid) {
1640 return String.format("%019d", seqid);
1641 }
1642
1643 public static final long FIXED_OVERHEAD = ClassSize.align(
1644 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1645 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1646
1647 private static void usage() {
1648 System.err.println("Usage: HLog <ARGS>");
1649 System.err.println("Arguments:");
1650 System.err.println(" --dump Dump textual representation of passed one or more files");
1651 System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1652 System.err.println(" --split Split the passed directory of WAL logs");
1653 System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1654 }
1655
1656 private static void split(final Configuration conf, final Path p)
1657 throws IOException {
1658 FileSystem fs = FileSystem.get(conf);
1659 if (!fs.exists(p)) {
1660 throw new FileNotFoundException(p.toString());
1661 }
1662 if (!fs.getFileStatus(p).isDir()) {
1663 throw new IOException(p + " is not a directory");
1664 }
1665
1666 final Path baseDir = FSUtils.getRootDir(conf);
1667 final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1668 HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
1669 }
1670
1671 @Override
1672 public WALCoprocessorHost getCoprocessorHost() {
1673 return coprocessorHost;
1674 }
1675
1676
1677 boolean hasUnSyncedEntries() {
1678 return this.lastUnSyncedTxid > this.syncedTillHere.get();
1679 }
1680
1681 @Override
1682 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1683 Long result = oldestUnflushedSeqNums.get(encodedRegionName);
1684 return result == null ? HConstants.NO_SEQNUM : result.longValue();
1685 }
1686
1687
1688
1689
1690
1691
1692
1693
1694 public static void main(String[] args) throws IOException {
1695 if (args.length < 2) {
1696 usage();
1697 System.exit(-1);
1698 }
1699
1700 if (args[0].compareTo("--dump") == 0) {
1701 HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1702 } else if (args[0].compareTo("--split") == 0) {
1703 Configuration conf = HBaseConfiguration.create();
1704 for (int i = 1; i < args.length; i++) {
1705 try {
1706 Path logPath = new Path(args[i]);
1707 FSUtils.setFsDefault(conf, logPath);
1708 split(conf, logPath);
1709 } catch (Throwable t) {
1710 t.printStackTrace(System.err);
1711 System.exit(-1);
1712 }
1713 }
1714 } else {
1715 usage();
1716 System.exit(-1);
1717 }
1718 }
1719
1720
1721
1722
1723
1724 private Method getGetPipeline(final FSDataOutputStream os) {
1725 Method m = null;
1726 if (os != null) {
1727 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
1728 .getClass();
1729 try {
1730 m = wrappedStreamClass.getDeclaredMethod("getPipeline",
1731 new Class<?>[] {});
1732 m.setAccessible(true);
1733 } catch (NoSuchMethodException e) {
1734 LOG.info("FileSystem's output stream doesn't support"
1735 + " getPipeline; not available; fsOut="
1736 + wrappedStreamClass.getName());
1737 } catch (SecurityException e) {
1738 LOG.info(
1739 "Doesn't have access to getPipeline on "
1740 + "FileSystems's output stream ; fsOut="
1741 + wrappedStreamClass.getName(), e);
1742 m = null;
1743 }
1744 }
1745 return m;
1746 }
1747
1748
1749
1750
1751
1752 DatanodeInfo[] getPipeLine() {
1753 if (this.getPipeLine != null && this.hdfs_out != null) {
1754 Object repl;
1755 try {
1756 repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
1757 if (repl instanceof DatanodeInfo[]) {
1758 return ((DatanodeInfo[]) repl);
1759 }
1760 } catch (Exception e) {
1761 LOG.info("Get pipeline failed", e);
1762 }
1763 }
1764 return new DatanodeInfo[0];
1765 }
1766 }