View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.util.Random;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.testclassification.MediumTests;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
43  import org.junit.After;
44  import org.junit.Before;
45  import org.junit.Rule;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  import org.junit.rules.TestName;
49  
50  
51  /**
52   * Testing of multiPut in parallel.
53   *
54   */
55  @Category(MediumTests.class)
56  public class TestParallelPut {
57    static final Log LOG = LogFactory.getLog(TestParallelPut.class);
58    @Rule public TestName name = new TestName(); 
59    
60    private static HRegion region = null;
61    private static HBaseTestingUtility hbtu = new HBaseTestingUtility();
62  
63    // Test names
64    static byte[] tableName;
65    static final byte[] qual1 = Bytes.toBytes("qual1");
66    static final byte[] qual2 = Bytes.toBytes("qual2");
67    static final byte[] qual3 = Bytes.toBytes("qual3");
68    static final byte[] value1 = Bytes.toBytes("value1");
69    static final byte[] value2 = Bytes.toBytes("value2");
70    static final byte [] row = Bytes.toBytes("rowA");
71    static final byte [] row2 = Bytes.toBytes("rowB");
72  
73    /**
74     * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
75     */
76    @Before
77    public void setUp() throws Exception {
78      tableName = Bytes.toBytes(name.getMethodName());
79    }
80  
81    @After
82    public void tearDown() throws Exception {
83      EnvironmentEdgeManagerTestHelper.reset();
84    }
85    
86    public String getName() {
87      return name.getMethodName();
88    }
89  
90    //////////////////////////////////////////////////////////////////////////////
91    // New tests that don't spin up a mini cluster but rather just test the
92    // individual code pieces in the HRegion. 
93    //////////////////////////////////////////////////////////////////////////////
94  
95    /**
96     * Test one put command.
97     */
98    @Test
99    public void testPut() throws IOException {
100     LOG.info("Starting testPut");
101     initHRegion(tableName, getName(), fam1);
102 
103     long value = 1L;
104 
105     Put put = new Put(row);
106     put.add(fam1, qual1, Bytes.toBytes(value));
107     region.put(put);
108 
109     assertGet(row, fam1, qual1, Bytes.toBytes(value));
110   }
111 
112   /**
113    * Test multi-threaded Puts.
114    */
115   @Test
116   public void testParallelPuts() throws IOException {
117 
118     LOG.info("Starting testParallelPuts");
119     initHRegion(tableName, getName(), fam1);
120     int numOps = 1000; // these many operations per thread
121 
122     // create 100 threads, each will do its own puts
123     int numThreads = 100;
124     Putter[] all = new Putter[numThreads];
125 
126     // create all threads
127     for (int i = 0; i < numThreads; i++) {
128       all[i] = new Putter(region, i, numOps);
129     }
130 
131     // run all threads
132     for (int i = 0; i < numThreads; i++) {
133       all[i].start();
134     }
135 
136     // wait for all threads to finish
137     for (int i = 0; i < numThreads; i++) {
138       try {
139         all[i].join();
140       } catch (InterruptedException e) {
141         LOG.warn("testParallelPuts encountered InterruptedException." +
142                  " Ignoring....", e);
143       }
144     }
145     LOG.info("testParallelPuts successfully verified " + 
146              (numOps * numThreads) + " put operations.");
147   }
148 
149 
150   static private void assertGet(byte [] row,
151                          byte [] familiy,
152                          byte[] qualifier,
153                          byte[] value) throws IOException {
154     // run a get and see if the value matches
155     Get get = new Get(row);
156     get.addColumn(familiy, qualifier);
157     Result result = region.get(get);
158     assertEquals(1, result.size());
159 
160     Cell kv = result.rawCells()[0];
161     byte[] r = CellUtil.cloneValue(kv);
162     assertTrue(Bytes.compareTo(r, value) == 0);
163   }
164 
165   private void initHRegion(byte [] tableName, String callingMethod,
166     byte[] ... families)
167   throws IOException {
168     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
169     for(byte [] family : families) {
170       htd.addFamily(new HColumnDescriptor(family));
171     }
172     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
173     region = hbtu.createLocalHRegion(info, htd);
174   }
175 
176   /**
177    * A thread that makes a few put calls
178    */
179   public static class Putter extends Thread {
180 
181     private final HRegion region;
182     private final int threadNumber;
183     private final int numOps;
184     private final Random rand = new Random();
185     byte [] rowkey = null;
186 
187     public Putter(HRegion region, int threadNumber, int numOps) {
188       this.region = region;
189       this.threadNumber = threadNumber;
190       this.numOps = numOps;
191       this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread
192       setDaemon(true);
193     }
194 
195     @Override
196     public void run() {
197       byte[] value = new byte[100];
198       Put[]  in = new Put[1];
199 
200       // iterate for the specified number of operations
201       for (int i=0; i<numOps; i++) {
202         // generate random bytes
203         rand.nextBytes(value);  
204 
205         // put the randombytes and verify that we can read it. This is one
206         // way of ensuring that rwcc manipulation in HRegion.put() is fine.
207         Put put = new Put(rowkey);
208         put.add(fam1, qual1, value);
209         in[0] = put;
210         try {
211           OperationStatus[] ret = region.batchMutate(in);
212           assertEquals(1, ret.length);
213           assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
214           assertGet(rowkey, fam1, qual1, value);
215         } catch (IOException e) {
216           assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
217                      false);
218         }
219       }
220     }
221   }
222 
223 }
224