1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.GatheringByteChannel;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28
29
30
31
32 @InterfaceAudience.Private
33 class BufferChain {
34 private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
35 private final ByteBuffer[] buffers;
36 private int remaining = 0;
37 private int bufferOffset = 0;
38
39 BufferChain(ByteBuffer ... buffers) {
40
41 List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
42 for (ByteBuffer b : buffers) {
43 if (b == null) continue;
44 bbs.add(b);
45 this.remaining += b.remaining();
46 }
47 this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
48 }
49
50
51
52
53
54
55 byte [] getBytes() {
56 if (!hasRemaining()) throw new IllegalAccessError();
57 byte [] bytes = new byte [this.remaining];
58 int offset = 0;
59 for (ByteBuffer bb: this.buffers) {
60 System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
61 offset += bb.capacity();
62 }
63 return bytes;
64 }
65
66 boolean hasRemaining() {
67 return remaining > 0;
68 }
69
70
71
72
73
74
75
76
77 long write(GatheringByteChannel channel, int chunkSize) throws IOException {
78 int chunkRemaining = chunkSize;
79 ByteBuffer lastBuffer = null;
80 int bufCount = 0;
81 int restoreLimit = -1;
82
83 while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
84 lastBuffer = buffers[bufferOffset + bufCount];
85 if (!lastBuffer.hasRemaining()) {
86 bufferOffset++;
87 continue;
88 }
89 bufCount++;
90 if (lastBuffer.remaining() > chunkRemaining) {
91 restoreLimit = lastBuffer.limit();
92 lastBuffer.limit(lastBuffer.position() + chunkRemaining);
93 chunkRemaining = 0;
94 break;
95 } else {
96 chunkRemaining -= lastBuffer.remaining();
97 }
98 }
99 assert lastBuffer != null;
100 if (chunkRemaining == chunkSize) {
101 assert !hasRemaining();
102
103 return 0;
104 }
105 try {
106 long ret = channel.write(buffers, bufferOffset, bufCount);
107 if (ret > 0) {
108 remaining -= ret;
109 }
110 return ret;
111 } finally {
112 if (restoreLimit >= 0) {
113 lastBuffer.limit(restoreLimit);
114 }
115 }
116 }
117 }