View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.hbase.Abortable;
24  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  
29  /**
30   * A scheduler that maintains isolated handler pools for general, high-priority and replication
31   * requests.
32   */
33  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
34  @InterfaceStability.Evolving
35  public class SimpleRpcScheduler extends RpcScheduler {
36    public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
37  
38    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
39      "hbase.ipc.server.callqueue.read.share";
40    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
41      "hbase.ipc.server.callqueue.handler.factor";
42    public static final String CALL_QUEUE_MAX_LENGTH_CONF_KEY =
43      "hbase.ipc.server.max.callqueue.length";
44  
45    private int port;
46    private final PriorityFunction priority;
47    private final RpcExecutor callExecutor;
48    private final RpcExecutor priorityExecutor;
49    private final RpcExecutor replicationExecutor;
50  
51    /** What level a high priority call is at. */
52    private final int highPriorityLevel;
53    
54    private final Abortable abortable;
55  
56    /**
57     * @param conf
58     * @param handlerCount the number of handler threads that will be used to process calls
59     * @param priorityHandlerCount How many threads for priority handling.
60     * @param replicationHandlerCount How many threads for replication handling.
61     * @param highPriorityLevel
62     * @param priority Function to extract request priority.
63     */
64    public SimpleRpcScheduler(
65        Configuration conf,
66        int handlerCount,
67        int priorityHandlerCount,
68        int replicationHandlerCount,
69        PriorityFunction priority,
70        Abortable abortable,
71        int highPriorityLevel) {
72      int maxQueueLength = conf.getInt(CALL_QUEUE_MAX_LENGTH_CONF_KEY,
73        conf.getInt("ipc.server.max.callqueue.length",
74          handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER));
75      this.priority = priority;
76      this.highPriorityLevel = highPriorityLevel;
77      this.abortable = abortable;
78  
79      float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY,
80        conf.getFloat("ipc.server.callqueue.read.share", 0));
81  
82      float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
83        conf.getFloat("ipc.server.callqueue.handler.factor", 0));
84      int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
85  
86      LOG.info("Using default user call queue, count=" + numCallQueues);
87  
88      if (numCallQueues > 1 && callqReadShare > 0) {
89        // multiple read/write queues
90        callExecutor = new RWQueueRpcExecutor("RW.Default", handlerCount, numCallQueues,
91          callqReadShare, maxQueueLength, conf, abortable);
92      } else {
93        // multiple queues
94        callExecutor = new BalancedQueueRpcExecutor("B.Default", handlerCount,
95          numCallQueues, maxQueueLength, conf, abortable);
96      }
97  
98      this.priorityExecutor =
99          priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
100           1, maxQueueLength, conf, abortable) : null;
101     this.replicationExecutor =
102        replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
103          replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
104   }
105 
106   @Override
107   public void init(Context context) {
108     this.port = context.getListenerAddress().getPort();
109   }
110 
111     @Override
112   public void start() {
113     callExecutor.start(port);
114     if (priorityExecutor != null) priorityExecutor.start(port);
115     if (replicationExecutor != null) replicationExecutor.start(port);
116   }
117 
118   @Override
119   public void stop() {
120     callExecutor.stop();
121     if (priorityExecutor != null) priorityExecutor.stop();
122     if (replicationExecutor != null) replicationExecutor.stop();
123   }
124 
125   @Override
126   public void dispatch(CallRunner callTask) throws InterruptedException {
127     RpcServer.Call call = callTask.getCall();
128     int level = priority.getPriority(call.getHeader(), call.param);
129     if (priorityExecutor != null && level > highPriorityLevel) {
130       priorityExecutor.dispatch(callTask);
131     } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
132       replicationExecutor.dispatch(callTask);
133     } else {
134       callExecutor.dispatch(callTask);
135     }
136   }
137 
138   @Override
139   public int getGeneralQueueLength() {
140     return callExecutor.getQueueLength();
141   }
142 
143   @Override
144   public int getPriorityQueueLength() {
145     return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
146   }
147 
148   @Override
149   public int getReplicationQueueLength() {
150     return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
151   }
152 
153   @Override
154   public int getActiveRpcHandlerCount() {
155     return callExecutor.getActiveHandlerCount() +
156            (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
157            (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
158   }
159 }
160