View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.hbase.classification.InterfaceAudience;
23  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
24  
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  
29  /**
30   * A wrapper for a runnable for a group of actions for a single regionserver.
31   * <p>
32   * This can be used to build up the actions that should be taken and then
33   * </p>
34   * <p>
35   * This class exists to simulate using a ScheduledExecutorService with just a regular
36   * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could
37   * only be removed if we change the expectations in HTable around the pool the client is able to
38   * pass in and even if we deprecate the current APIs would require keeping this class around
39   * for the interim to bridge between the legacy ExecutorServices and the scheduled pool.
40   * </p>
41   */
42  @InterfaceAudience.Private
43  public class DelayingRunner<T> implements Runnable {
44    private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
45  
46    private final Object sleepLock = new Object();
47    private boolean triggerWake = false;
48    private long sleepTime;
49    private MultiAction<T> actions = new MultiAction<T>();
50    private Runnable runnable;
51  
52    public DelayingRunner(long sleepTime, Entry<byte[], List<Action<T>>> e) {
53      this.sleepTime = sleepTime;
54      add(e);
55    }
56  
57    public void setRunner(Runnable runner) {
58      this.runnable = runner;
59    }
60  
61    @Override
62    public void run() {
63      if (!sleep()) {
64        LOG.warn(
65            "Interrupted while sleeping for expected sleep time " + sleepTime + " ms");
66      }
67      //TODO maybe we should consider switching to a listenableFuture for the actual callable and
68      // then handling the results/errors as callbacks. That way we can decrement outstanding tasks
69      // even if we get interrupted here, but for now, we still need to run so we decrement the
70      // outstanding tasks
71      this.runnable.run();
72    }
73  
74    /**
75     * Sleep for an expected amount of time.
76     * <p>
77     * This is nearly a copy of what the Sleeper does, but with the ability to know if you
78     * got interrupted while sleeping.
79     * </p>
80     *
81     * @return <tt>true</tt> if the sleep completely entirely successfully,
82     * but otherwise <tt>false</tt> if the sleep was interrupted.
83     */
84    private boolean sleep() {
85      long now = EnvironmentEdgeManager.currentTimeMillis();
86      long startTime = now;
87      long waitTime = sleepTime;
88      while (waitTime > 0) {
89        long woke = -1;
90        try {
91          synchronized (sleepLock) {
92            if (triggerWake) break;
93            sleepLock.wait(waitTime);
94          }
95          woke = EnvironmentEdgeManager.currentTimeMillis();
96        } catch (InterruptedException iex) {
97          return false;
98        }
99        // Recalculate waitTime.
100       woke = (woke == -1) ? EnvironmentEdgeManager.currentTimeMillis() : woke;
101       waitTime = waitTime - (woke - startTime);
102     }
103     return true;
104   }
105 
106   public void add(Map.Entry<byte[], List<Action<T>>> e) {
107     actions.add(e.getKey(), e.getValue());
108   }
109 
110   public MultiAction<T> getActions() {
111     return actions;
112   }
113 
114   public long getSleepTime() {
115     return sleepTime;
116   }
117 }