1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.hbase.DaemonThreadFactory;
22
23 import java.io.IOException;
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import org.apache.hadoop.hbase.ipc.CallRunner;
28
29
30
31
32
33
34 public class FifoRpcScheduler extends RpcScheduler {
35
36 private final int handlerCount;
37 private final int maxQueueLength;
38 private ThreadPoolExecutor executor;
39
40 public FifoRpcScheduler(Configuration conf, int handlerCount) {
41 this.handlerCount = handlerCount;
42 this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
43 conf.getInt("ipc.server.max.callqueue.length",
44 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER));
45 }
46
47 @Override
48 public void init(Context context) {
49
50 }
51
52 @Override
53 public void start() {
54 this.executor = new ThreadPoolExecutor(
55 handlerCount,
56 handlerCount,
57 60,
58 TimeUnit.SECONDS,
59 new ArrayBlockingQueue<Runnable>(maxQueueLength),
60 new DaemonThreadFactory("FifoRpcScheduler.handler"),
61 new ThreadPoolExecutor.CallerRunsPolicy());
62 }
63
64 @Override
65 public void stop() {
66 this.executor.shutdown();
67 }
68
69 @Override
70 public void dispatch(final CallRunner task) throws IOException, InterruptedException {
71 executor.submit(new Runnable() {
72 @Override
73 public void run() {
74 task.run();
75 }
76 });
77 }
78
79 @Override
80 public int getGeneralQueueLength() {
81 return executor.getQueue().size();
82 }
83
84 @Override
85 public int getPriorityQueueLength() {
86 return 0;
87 }
88
89 @Override
90 public int getReplicationQueueLength() {
91 return 0;
92 }
93
94 @Override
95 public int getActiveRpcHandlerCount() {
96 return executor.getActiveCount();
97 }
98 }