View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HBaseTestingUtility;
28  import org.apache.hadoop.hbase.testclassification.MediumTests;
29  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
30  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
31  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.util.Threads;
35  import org.junit.AfterClass;
36  import org.junit.Assert;
37  import org.junit.BeforeClass;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  
41  import java.io.IOException;
42  import java.io.InterruptedIOException;
43  import java.net.SocketTimeoutException;
44  import java.util.ArrayList;
45  import java.util.List;
46  import java.util.concurrent.atomic.AtomicInteger;
47  
48  @Category(MediumTests.class)
49  public class TestClientOperationInterrupt {
50    private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class);
51  
52    private static HBaseTestingUtility util;
53    private static final byte[] tableName = Bytes.toBytes("test");
54    private static final byte[] dummy = Bytes.toBytes("dummy");
55    private static final byte[] row1 = Bytes.toBytes("r1");
56    private static final byte[] test = Bytes.toBytes("test");
57    private static Configuration conf;
58  
59    public static class TestCoprocessor extends BaseRegionObserver {
60      @Override
61      public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
62                           final Get get, final List<Cell> results) throws IOException {
63        Threads.sleep(2500);
64      }
65    }
66  
67  
68    @BeforeClass
69    public static void setUpBeforeClass() throws Exception {
70      conf = HBaseConfiguration.create();
71      conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
72          TestCoprocessor.class.getName());
73      util = new HBaseTestingUtility(conf);
74      util.startMiniCluster();
75  
76      HBaseAdmin admin = util.getHBaseAdmin();
77      if (admin.tableExists(tableName)) {
78        if (admin.isTableEnabled(tableName)) {
79          admin.disableTable(tableName);
80        }
81        admin.deleteTable(tableName);
82      }
83      util.createTable(tableName, new byte[][]{dummy, test});
84  
85      HTable ht = new HTable(conf, tableName);
86      Put p = new Put(row1);
87      p.add(dummy, dummy, dummy);
88      ht.put(p);
89    }
90  
91  
92    @Test
93    public void testInterrupt50Percent() throws IOException, InterruptedException {
94      final AtomicInteger noEx = new AtomicInteger(0);
95      final AtomicInteger badEx = new AtomicInteger(0);
96      final AtomicInteger noInt = new AtomicInteger(0);
97      final AtomicInteger done = new AtomicInteger(0);
98      List<Thread> threads = new ArrayList<Thread>();
99  
100     final int nbThread = 100;
101 
102     for (int i = 0; i < nbThread; i++) {
103       Thread t = new Thread() {
104         @Override
105         public void run() {
106           try {
107             HTable ht = new HTable(conf, tableName);
108             Result r = ht.get(new Get(row1));
109             noEx.incrementAndGet();
110           } catch (IOException e) {
111             LOG.info("exception", e);
112             if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
113               badEx.incrementAndGet();
114             } else {
115               if (Thread.currentThread().isInterrupted()) {
116                 noInt.incrementAndGet();
117                 LOG.info("The thread should NOT be with the 'interrupt' status.");
118               }
119             }
120           } finally {
121             done.incrementAndGet();
122           }
123         }
124       };
125       t.setName("TestClientOperationInterrupt #" + i);
126       threads.add(t);
127       t.start();
128     }
129 
130     for (int i = 0; i < nbThread / 2; i++) {
131       threads.get(i).interrupt();
132     }
133 
134 
135     boolean stillAlive = true;
136     while (stillAlive) {
137       stillAlive = false;
138       for (Thread t : threads) {
139         if (t.isAlive()) {
140           stillAlive = true;
141         }
142       }
143       Threads.sleep(10);
144     }
145 
146     Assert.assertFalse(Thread.currentThread().isInterrupted());
147 
148     Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
149         noEx.get() == nbThread / 2 && badEx.get() == 0);
150 
151     // The problem here is that we need the server to free its handlers to handle all operations
152     while (done.get() != nbThread){
153       Thread.sleep(1);
154     }
155 
156     HTable ht = new HTable(conf, tableName);
157     Result r = ht.get(new Get(row1));
158     Assert.assertFalse(r.isEmpty());
159   }
160 
161   @AfterClass
162   public static void tearDownAfterClass() throws Exception {
163     util.shutdownMiniCluster();
164   }
165 }