View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.GatheringByteChannel;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  
28  /**
29   * Chain of ByteBuffers.
30   * Used writing out an array of byte buffers.  Writes in chunks.
31   */
32  @InterfaceAudience.Private
33  class BufferChain {
34    private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
35    private final ByteBuffer[] buffers;
36    private int remaining = 0;
37    private int bufferOffset = 0;
38  
39    BufferChain(ByteBuffer ... buffers) {
40      // Some of the incoming buffers can be null
41      List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
42      for (ByteBuffer b : buffers) {
43        if (b == null) continue;
44        bbs.add(b);
45        this.remaining += b.remaining();
46      }
47      this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
48    }
49  
50    /**
51     * Expensive.  Makes a new buffer to hold a copy of what is in contained ByteBuffers.  This
52     * call drains this instance; it cannot be used subsequent to the call.
53     * @return A new byte buffer with the content of all contained ByteBuffers.
54     */
55    byte [] getBytes() {
56      if (!hasRemaining()) throw new IllegalAccessError();
57      byte [] bytes = new byte [this.remaining];
58      int offset = 0;
59      for (ByteBuffer bb: this.buffers) {
60        System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
61        offset += bb.capacity();
62      }
63      return bytes;
64    }
65  
66    boolean hasRemaining() {
67      return remaining > 0;
68    }
69  
70    /**
71     * Write out our chain of buffers in chunks
72     * @param channel Where to write
73     * @param chunkSize Size of chunks to write.
74     * @return Amount written.
75     * @throws IOException
76     */
77    long write(GatheringByteChannel channel, int chunkSize) throws IOException {
78      int chunkRemaining = chunkSize;
79      ByteBuffer lastBuffer = null;
80      int bufCount = 0;
81      int restoreLimit = -1;
82  
83      while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
84        lastBuffer = buffers[bufferOffset + bufCount];
85        if (!lastBuffer.hasRemaining()) {
86          bufferOffset++;
87          continue;
88        }
89        bufCount++;
90        if (lastBuffer.remaining() > chunkRemaining) {
91          restoreLimit = lastBuffer.limit();
92          lastBuffer.limit(lastBuffer.position() + chunkRemaining);
93          chunkRemaining = 0;
94          break;
95        } else {
96          chunkRemaining -= lastBuffer.remaining();
97        }
98      }
99      assert lastBuffer != null;
100     if (chunkRemaining == chunkSize) {
101       assert !hasRemaining();
102       // no data left to write
103       return 0;
104     }
105     try {
106       long ret = channel.write(buffers, bufferOffset, bufCount);
107       if (ret > 0) {
108         remaining -= ret;
109       }
110       return ret;
111     } finally {
112       if (restoreLimit >= 0) {
113         lastBuffer.limit(restoreLimit);
114       }
115     }
116   }
117 }