View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io.hadoopbackport;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  import org.apache.hadoop.fs.PositionedReadable;
25  
26  /**
27   * The ThrottleInputStream provides bandwidth throttling on a specified
28   * InputStream. It is implemented as a wrapper on top of another InputStream
29   * instance.
30   * The throttling works by examining the number of bytes read from the underlying
31   * InputStream from the beginning, and sleep()ing for a time interval if
32   * the byte-transfer is found exceed the specified tolerable maximum.
33   * (Thus, while the read-rate might exceed the maximum for a given short interval,
34   * the average tends towards the specified maximum, overall.)
35   */
36  public class ThrottledInputStream extends InputStream {
37  
38    private final InputStream rawStream;
39    private final long maxBytesPerSec;
40    private final long startTime = System.currentTimeMillis();
41  
42    private long bytesRead = 0;
43    private long totalSleepTime = 0;
44  
45    private static final long SLEEP_DURATION_MS = 50;
46  
47    public ThrottledInputStream(InputStream rawStream) {
48      this(rawStream, Long.MAX_VALUE);
49    }
50  
51    public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
52      assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
53      this.rawStream = rawStream;
54      this.maxBytesPerSec = maxBytesPerSec;
55    }
56  
57    @Override
58    public void close() throws IOException {
59      rawStream.close();
60    }
61  
62    /** @inheritDoc */
63    @Override
64    public int read() throws IOException {
65      throttle();
66      int data = rawStream.read();
67      if (data != -1) {
68        bytesRead++;
69      }
70      return data;
71    }
72  
73    /** @inheritDoc */
74    @Override
75    public int read(byte[] b) throws IOException {
76      throttle();
77      int readLen = rawStream.read(b);
78      if (readLen != -1) {
79        bytesRead += readLen;
80      }
81      return readLen;
82    }
83  
84    /** @inheritDoc */
85    @Override
86    public int read(byte[] b, int off, int len) throws IOException {
87      throttle();
88      int readLen = rawStream.read(b, off, len);
89      if (readLen != -1) {
90        bytesRead += readLen;
91      }
92      return readLen;
93    }
94  
95    /**
96     * Read bytes starting from the specified position. This requires rawStream is
97     * an instance of {@link PositionedReadable}.
98     * @param position
99     * @param buffer
100    * @param offset
101    * @param length
102    * @return the number of bytes read
103    */
104   public int read(long position, byte[] buffer, int offset, int length)
105       throws IOException {
106     if (!(rawStream instanceof PositionedReadable)) {
107       throw new UnsupportedOperationException(
108         "positioned read is not supported by the internal stream");
109     }
110     throttle();
111     int readLen = ((PositionedReadable) rawStream).read(position, buffer,
112       offset, length);
113     if (readLen != -1) {
114       bytesRead += readLen;
115     }
116     return readLen;
117   }
118 
119   private void throttle() throws IOException {
120     while (getBytesPerSec() > maxBytesPerSec) {
121       try {
122         Thread.sleep(SLEEP_DURATION_MS);
123         totalSleepTime += SLEEP_DURATION_MS;
124       } catch (InterruptedException e) {
125         throw new IOException("Thread aborted", e);
126       }
127     }
128   }
129 
130   /**
131    * Getter for the number of bytes read from this stream, since creation.
132    * @return The number of bytes.
133    */
134   public long getTotalBytesRead() {
135     return bytesRead;
136   }
137 
138   /**
139    * Getter for the read-rate from this stream, since creation.
140    * Calculated as bytesRead/elapsedTimeSinceStart.
141    * @return Read rate, in bytes/sec.
142    */
143   public long getBytesPerSec() {
144     long elapsed = (System.currentTimeMillis() - startTime) / 1000;
145     if (elapsed == 0) {
146       return bytesRead;
147     } else {
148       return bytesRead / elapsed;
149     }
150   }
151 
152   /**
153    * Getter the total time spent in sleep.
154    * @return Number of milliseconds spent in sleep.
155    */
156   public long getTotalSleepTime() {
157     return totalSleepTime;
158   }
159 
160   /** @inheritDoc */
161   @Override
162   public String toString() {
163     return "ThrottledInputStream{" +
164         "bytesRead=" + bytesRead +
165         ", maxBytesPerSec=" + maxBytesPerSec +
166         ", bytesPerSec=" + getBytesPerSec() +
167         ", totalSleepTime=" + totalSleepTime +
168         '}';
169   }
170 }