View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.nio.ByteBuffer;
29  import java.util.Arrays;
30  import java.util.HashSet;
31  import java.util.Random;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.MultithreadedTestUtil;
38  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
39  import org.apache.hadoop.hbase.io.HeapSize;
40  import org.apache.hadoop.hbase.io.compress.Compression;
41  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
42  import org.apache.hadoop.hbase.util.ChecksumType;
43  
44  public class CacheTestUtils {
45  
46    private static final boolean includesMemstoreTS = true;
47  
48    /**
49     * Just checks if heapsize grows when something is cached, and gets smaller
50     * when the same object is evicted
51     */
52  
53    public static void testHeapSizeChanges(final BlockCache toBeTested,
54        final int blockSize) {
55      HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
56      long heapSize = ((HeapSize) toBeTested).heapSize();
57      toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
58  
59      /*When we cache something HeapSize should always increase */
60      assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
61  
62      toBeTested.evictBlock(blocks[0].blockName);
63  
64      /*Post eviction, heapsize should be the same */
65      assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
66    }
67    public static void testCacheMultiThreaded(final BlockCache toBeTested,
68        final int blockSize, final int numThreads, final int numQueries,
69        final double passingScore) throws Exception {
70  
71      Configuration conf = new Configuration();
72      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
73          conf);
74  
75      final AtomicInteger totalQueries = new AtomicInteger();
76      final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>();
77      final AtomicInteger hits = new AtomicInteger();
78      final AtomicInteger miss = new AtomicInteger();
79  
80      HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
81      blocksToTest.addAll(Arrays.asList(blocks));
82  
83      for (int i = 0; i < numThreads; i++) {
84        TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
85          @Override
86          public void doAnAction() throws Exception {
87            if (!blocksToTest.isEmpty()) {
88              HFileBlockPair ourBlock = blocksToTest.poll();
89              // if we run out of blocks to test, then we should stop the tests.
90              if (ourBlock == null) {
91                ctx.setStopFlag(true);
92                return;
93              }
94              toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
95              Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
96                  false, false, true);
97              if (retrievedBlock != null) {
98                assertEquals(ourBlock.block, retrievedBlock);
99                toBeTested.evictBlock(ourBlock.blockName);
100               hits.incrementAndGet();
101               assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
102             } else {
103               miss.incrementAndGet();
104             }
105             totalQueries.incrementAndGet();
106           }
107         }
108       };
109       t.setDaemon(true);
110       ctx.addThread(t);
111     }
112     ctx.startThreads();
113     while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
114       Thread.sleep(10);
115     }
116     ctx.stop();
117     if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
118       fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
119           + miss.get());
120     }
121   }
122 
123   public static void testCacheSimple(BlockCache toBeTested, int blockSize,
124       int numBlocks) throws Exception {
125 
126     HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
127     // Confirm empty
128     for (HFileBlockPair block : blocks) {
129       assertNull(toBeTested.getBlock(block.blockName, true, false, true));
130     }
131 
132     // Add blocks
133     for (HFileBlockPair block : blocks) {
134       toBeTested.cacheBlock(block.blockName, block.block);
135     }
136 
137     // Check if all blocks are properly cached and contain the right
138     // information, or the blocks are null.
139     // MapMaker makes no guarantees when it will evict, so neither can we.
140 
141     for (HFileBlockPair block : blocks) {
142       HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
143       if (buf != null) {
144         assertEquals(block.block, buf);
145       }
146 
147     }
148 
149     // Re-add some duplicate blocks. Hope nothing breaks.
150 
151     for (HFileBlockPair block : blocks) {
152       try {
153         if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
154           toBeTested.cacheBlock(block.blockName, block.block);
155           if (!(toBeTested instanceof BucketCache)) {
156             // BucketCache won't throw exception when caching already cached
157             // block
158             fail("Cache should not allow re-caching a block");
159           }
160         }
161       } catch (RuntimeException re) {
162         // expected
163       }
164     }
165 
166   }
167 
168   public static void hammerSingleKey(final BlockCache toBeTested,
169       int BlockSize, int numThreads, int numQueries) throws Exception {
170     final BlockCacheKey key = new BlockCacheKey("key", 0);
171     final byte[] buf = new byte[5 * 1024];
172     Arrays.fill(buf, (byte) 5);
173 
174     final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
175     Configuration conf = new Configuration();
176     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
177         conf);
178 
179     final AtomicInteger totalQueries = new AtomicInteger();
180     toBeTested.cacheBlock(key, bac);
181 
182     for (int i = 0; i < numThreads; i++) {
183       TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
184         @Override
185         public void doAnAction() throws Exception {
186           ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
187               .getBlock(key, false, false, true);
188           assertArrayEquals(buf, returned.buf);
189           totalQueries.incrementAndGet();
190         }
191       };
192 
193       t.setDaemon(true);
194       ctx.addThread(t);
195     }
196 
197     ctx.startThreads();
198     while (totalQueries.get() < numQueries && ctx.shouldRun()) {
199       Thread.sleep(10);
200     }
201     ctx.stop();
202   }
203 
204   public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
205       int numThreads, int numQueries) throws Exception {
206 
207     Configuration conf = new Configuration();
208     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
209         conf);
210 
211     final AtomicInteger totalQueries = new AtomicInteger();
212 
213     for (int i = 0; i < numThreads; i++) {
214       final int finalI = i;
215 
216       final byte[] buf = new byte[5 * 1024];
217       TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
218         @Override
219         public void doAnAction() throws Exception {
220           for (int j = 0; j < 100; j++) {
221             BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
222             Arrays.fill(buf, (byte) (finalI * j));
223             final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
224 
225             ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
226                 .getBlock(key, true, false, true);
227             if (gotBack != null) {
228               assertArrayEquals(gotBack.buf, bac.buf);
229             } else {
230               toBeTested.cacheBlock(key, bac);
231             }
232           }
233           totalQueries.incrementAndGet();
234         }
235       };
236 
237       t.setDaemon(true);
238       ctx.addThread(t);
239     }
240 
241     ctx.startThreads();
242     while (totalQueries.get() < numQueries && ctx.shouldRun()) {
243       Thread.sleep(10);
244     }
245     ctx.stop();
246 
247     assertTrue(toBeTested.getStats().getEvictedCount() > 0);
248   }
249 
250   private static class ByteArrayCacheable implements Cacheable {
251 
252     static final CacheableDeserializer<Cacheable> blockDeserializer = 
253       new CacheableDeserializer<Cacheable>() {
254       
255       @Override
256       public Cacheable deserialize(ByteBuffer b) throws IOException {
257         int len = b.getInt();
258         Thread.yield();
259         byte buf[] = new byte[len];
260         b.get(buf);
261         return new ByteArrayCacheable(buf);
262       }
263 
264       @Override
265       public int getDeserialiserIdentifier() {
266         return deserializerIdentifier;
267       }
268 
269       @Override
270       public Cacheable deserialize(ByteBuffer b, boolean reuse)
271           throws IOException {
272         return deserialize(b);
273       }
274     };
275 
276     final byte[] buf;
277 
278     public ByteArrayCacheable(byte[] buf) {
279       this.buf = buf;
280     }
281 
282     @Override
283     public long heapSize() {
284       return 4 + buf.length;
285     }
286 
287     @Override
288     public int getSerializedLength() {
289       return 4 + buf.length;
290     }
291 
292     @Override
293     public void serialize(ByteBuffer destination) {
294       destination.putInt(buf.length);
295       Thread.yield();
296       destination.put(buf);
297       destination.rewind();
298     }
299 
300     @Override
301     public CacheableDeserializer<Cacheable> getDeserializer() {
302       return blockDeserializer;
303     }
304 
305     private static final int deserializerIdentifier;
306     static {
307       deserializerIdentifier = CacheableDeserializerIdManager
308           .registerDeserializer(blockDeserializer);
309     }
310 
311     @Override
312     public BlockType getBlockType() {
313       return BlockType.DATA;
314     }
315   }
316 
317 
318   private static HFileBlockPair[] generateHFileBlocks(int blockSize,
319       int numBlocks) {
320     HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
321     Random rand = new Random();
322     HashSet<String> usedStrings = new HashSet<String>();
323     for (int i = 0; i < numBlocks; i++) {
324 
325       // The buffer serialized size needs to match the size of BlockSize. So we
326       // declare our data size to be smaller than it by the serialization space
327       // required.
328 
329       ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
330           - HFileBlock.EXTRA_SERIALIZATION_SPACE);
331       rand.nextBytes(cachedBuffer.array());
332       cachedBuffer.rewind();
333       int onDiskSizeWithoutHeader = blockSize
334           - HFileBlock.EXTRA_SERIALIZATION_SPACE;
335       int uncompressedSizeWithoutHeader = blockSize
336           - HFileBlock.EXTRA_SERIALIZATION_SPACE;
337       long prevBlockOffset = rand.nextLong();
338       BlockType.DATA.write(cachedBuffer);
339       cachedBuffer.putInt(onDiskSizeWithoutHeader);
340       cachedBuffer.putInt(uncompressedSizeWithoutHeader);
341       cachedBuffer.putLong(prevBlockOffset);
342       cachedBuffer.rewind();
343       HFileContext meta = new HFileContextBuilder()
344                           .withHBaseCheckSum(false)
345                           .withIncludesMvcc(includesMemstoreTS)
346                           .withIncludesTags(false)
347                           .withCompression(Compression.Algorithm.NONE)
348                           .withBytesPerCheckSum(0)
349                           .withChecksumType(ChecksumType.NULL)
350                           .build();
351       HFileBlock generated = new HFileBlock(BlockType.DATA,
352           onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
353           prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
354           blockSize,
355           onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
356 
357       String strKey;
358       /* No conflicting keys */
359       for (strKey = new Long(rand.nextLong()).toString(); !usedStrings
360           .add(strKey); strKey = new Long(rand.nextLong()).toString())
361         ;
362 
363       returnedBlocks[i] = new HFileBlockPair();
364       returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
365       returnedBlocks[i].block = generated;
366     }
367     return returnedBlocks;
368   }
369 
370   private static class HFileBlockPair {
371     BlockCacheKey blockName;
372     HFileBlock block;
373   }
374 }