1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hadoopbackport;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.hadoop.fs.PositionedReadable;
25
26
27
28
29
30
31
32
33
34
35
36 public class ThrottledInputStream extends InputStream {
37
38 private final InputStream rawStream;
39 private final long maxBytesPerSec;
40 private final long startTime = System.currentTimeMillis();
41
42 private long bytesRead = 0;
43 private long totalSleepTime = 0;
44
45 private static final long SLEEP_DURATION_MS = 50;
46
47 public ThrottledInputStream(InputStream rawStream) {
48 this(rawStream, Long.MAX_VALUE);
49 }
50
51 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
52 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
53 this.rawStream = rawStream;
54 this.maxBytesPerSec = maxBytesPerSec;
55 }
56
57 @Override
58 public void close() throws IOException {
59 rawStream.close();
60 }
61
62
63 @Override
64 public int read() throws IOException {
65 throttle();
66 int data = rawStream.read();
67 if (data != -1) {
68 bytesRead++;
69 }
70 return data;
71 }
72
73
74 @Override
75 public int read(byte[] b) throws IOException {
76 throttle();
77 int readLen = rawStream.read(b);
78 if (readLen != -1) {
79 bytesRead += readLen;
80 }
81 return readLen;
82 }
83
84
85 @Override
86 public int read(byte[] b, int off, int len) throws IOException {
87 throttle();
88 int readLen = rawStream.read(b, off, len);
89 if (readLen != -1) {
90 bytesRead += readLen;
91 }
92 return readLen;
93 }
94
95
96
97
98
99
100
101
102
103
104 public int read(long position, byte[] buffer, int offset, int length)
105 throws IOException {
106 if (!(rawStream instanceof PositionedReadable)) {
107 throw new UnsupportedOperationException(
108 "positioned read is not supported by the internal stream");
109 }
110 throttle();
111 int readLen = ((PositionedReadable) rawStream).read(position, buffer,
112 offset, length);
113 if (readLen != -1) {
114 bytesRead += readLen;
115 }
116 return readLen;
117 }
118
119 private void throttle() throws IOException {
120 while (getBytesPerSec() > maxBytesPerSec) {
121 try {
122 Thread.sleep(SLEEP_DURATION_MS);
123 totalSleepTime += SLEEP_DURATION_MS;
124 } catch (InterruptedException e) {
125 throw new IOException("Thread aborted", e);
126 }
127 }
128 }
129
130
131
132
133
134 public long getTotalBytesRead() {
135 return bytesRead;
136 }
137
138
139
140
141
142
143 public long getBytesPerSec() {
144 long elapsed = (System.currentTimeMillis() - startTime) / 1000;
145 if (elapsed == 0) {
146 return bytesRead;
147 } else {
148 return bytesRead / elapsed;
149 }
150 }
151
152
153
154
155
156 public long getTotalSleepTime() {
157 return totalSleepTime;
158 }
159
160
161 @Override
162 public String toString() {
163 return "ThrottledInputStream{" +
164 "bytesRead=" + bytesRead +
165 ", maxBytesPerSec=" + maxBytesPerSec +
166 ", bytesPerSec=" + getBytesPerSec() +
167 ", totalSleepTime=" + totalSleepTime +
168 '}';
169 }
170 }