View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.trace;
20  
21  import org.apache.commons.cli.CommandLine;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.hbase.HBaseConfiguration;
24  import org.apache.hadoop.hbase.IntegrationTestingUtility;
25  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
26  import org.apache.hadoop.hbase.client.Get;
27  import org.apache.hadoop.hbase.client.HBaseAdmin;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.util.ToolRunner;
36  import org.cloudera.htrace.Sampler;
37  import org.cloudera.htrace.Trace;
38  import org.cloudera.htrace.TraceScope;
39  import org.junit.Test;
40  import org.junit.experimental.categories.Category;
41  
42  import java.io.IOException;
43  import java.util.Random;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.Executors;
46  import java.util.concurrent.LinkedBlockingQueue;
47  import java.util.concurrent.TimeUnit;
48  
49  @Category(IntegrationTests.class)
50  public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
51  
52    public static final String TABLE_ARG = "t";
53    public static final String CF_ARG = "f";
54  
55    public static final String TABLE_NAME_DEFAULT = "SendTracesTable";
56    public static final String COLUMN_FAMILY_DEFAULT = "D";
57    private String tableName = TABLE_NAME_DEFAULT;
58    private String familyName = COLUMN_FAMILY_DEFAULT;
59    private IntegrationTestingUtility util;
60    private Random random = new Random();
61    private HBaseAdmin admin;
62    private SpanReceiverHost receiverHost;
63  
64    public static void main(String[] args) throws Exception {
65      Configuration configuration = HBaseConfiguration.create();
66      IntegrationTestingUtility.setUseDistributedCluster(configuration);
67      IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests();
68      ToolRunner.run(configuration, tool, args);
69    }
70  
71    @Override
72    protected void addOptions() {
73      addOptWithArg(TABLE_ARG, "The table name to target.  Will be created if not there already.");
74      addOptWithArg(CF_ARG, "The family to target");
75    }
76  
77    @Override
78    public void processOptions(CommandLine cmd) {
79      String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT);
80      String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT);
81  
82      this.tableName = tableNameString;
83      this.familyName = familyString;
84    }
85  
86    @Override
87    public int doWork() throws Exception {
88      internalDoWork();
89      return 0;
90    }
91  
92    @Test
93    public void internalDoWork() throws Exception {
94      util = createUtil();
95      admin = util.getHBaseAdmin();
96      setupReceiver();
97  
98      deleteTable();
99      createTable();
100     LinkedBlockingQueue<Long> rks = insertData();
101 
102     ExecutorService service = Executors.newFixedThreadPool(20);
103     doScans(service, rks);
104     doGets(service, rks);
105 
106     service.shutdown();
107     service.awaitTermination(100, TimeUnit.SECONDS);
108     Thread.sleep(90000);
109     receiverHost.closeReceivers();
110     util.restoreCluster();
111     util = null;
112   }
113 
114   private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) {
115 
116       for (int i = 0; i < 100; i++) {
117         Runnable runnable = new Runnable() {
118           private TraceScope innerScope = null;
119           private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
120           @Override
121           public void run() {
122             ResultScanner rs = null;
123             try {
124               innerScope = Trace.startSpan("Scan", Sampler.ALWAYS);
125               HTable ht = new HTable(util.getConfiguration(), tableName);
126               Scan s = new Scan();
127               s.setStartRow(Bytes.toBytes(rowKeyQueue.take()));
128               s.setBatch(7);
129               rs = ht.getScanner(s);
130               // Something to keep the jvm from removing the loop.
131               long accum = 0;
132 
133               for(int x = 0; x < 1000; x++) {
134                 Result r = rs.next();
135                 accum |= Bytes.toLong(r.getRow());
136               }
137 
138               innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum);
139 
140               ht.close();
141               ht = null;
142             } catch (IOException e) {
143               e.printStackTrace();
144 
145               innerScope.getSpan().addKVAnnotation(
146                   Bytes.toBytes("exception"),
147                   Bytes.toBytes(e.getClass().getSimpleName()));
148 
149             } catch (Exception e) {
150             } finally {
151               if (innerScope != null) innerScope.close();
152               if (rs != null) rs.close();
153             }
154 
155           }
156         };
157         service.submit(runnable);
158       }
159 
160   }
161 
162   private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys)
163       throws IOException {
164     for (int i = 0; i < 100; i++) {
165       Runnable runnable = new Runnable() {
166         private TraceScope innerScope = null;
167         private final LinkedBlockingQueue<Long> rowKeyQueue = rowKeys;
168 
169         @Override
170         public void run() {
171 
172 
173           HTable ht = null;
174           try {
175             ht = new HTable(util.getConfiguration(), tableName);
176           } catch (IOException e) {
177             e.printStackTrace();
178           }
179 
180           long accum = 0;
181           for (int x = 0; x < 5; x++) {
182             try {
183               innerScope = Trace.startSpan("gets", Sampler.ALWAYS);
184               long rk = rowKeyQueue.take();
185               Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
186               if (r1 != null) {
187                 accum |= Bytes.toLong(r1.getRow());
188               }
189               Result r2 = ht.get(new Get(Bytes.toBytes(rk)));
190               if (r2 != null) {
191                 accum |= Bytes.toLong(r2.getRow());
192               }
193               innerScope.getSpan().addTimelineAnnotation("Accum = " + accum);
194 
195             } catch (IOException e) {
196               // IGNORED
197             } catch (InterruptedException ie) {
198               // IGNORED
199             } finally {
200               if (innerScope != null) innerScope.close();
201             }
202           }
203 
204         }
205       };
206       service.submit(runnable);
207     }
208   }
209 
210   private void createTable() throws IOException {
211     TraceScope createScope = null;
212     try {
213       createScope = Trace.startSpan("createTable", Sampler.ALWAYS);
214       util.createTable(tableName, familyName);
215     } finally {
216       if (createScope != null) createScope.close();
217     }
218   }
219 
220   private void deleteTable() throws IOException {
221     TraceScope deleteScope = null;
222 
223     try {
224       if (admin.tableExists(tableName)) {
225         deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS);
226         util.deleteTable(tableName);
227       }
228     } finally {
229       if (deleteScope != null) deleteScope.close();
230     }
231   }
232 
233   private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
234     LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<Long>(25000);
235     HTable ht = new HTable(util.getConfiguration(), this.tableName);
236     byte[] value = new byte[300];
237     for (int x = 0; x < 5000; x++) {
238       TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
239       try {
240         ht.setAutoFlush(false, true);
241         for (int i = 0; i < 5; i++) {
242           long rk = random.nextLong();
243           rowKeys.add(rk);
244           Put p = new Put(Bytes.toBytes(rk));
245           for (int y = 0; y < 10; y++) {
246             random.nextBytes(value);
247             p.add(Bytes.toBytes(familyName),
248                 Bytes.toBytes(random.nextLong()),
249                 value);
250           }
251           ht.put(p);
252         }
253         if ((x % 1000) == 0) {
254           admin.flush(Bytes.toBytes(tableName));
255         }
256       } finally {
257         traceScope.close();
258       }
259     }
260     admin.flush(Bytes.toBytes(tableName));
261     return rowKeys;
262   }
263 
264   private IntegrationTestingUtility createUtil() throws Exception {
265     Configuration conf = getConf();
266     if (this.util == null) {
267       IntegrationTestingUtility u;
268       if (conf == null) {
269         u = new IntegrationTestingUtility();
270       } else {
271         u = new IntegrationTestingUtility(conf);
272       }
273       util = u;
274       util.initializeCluster(1);
275 
276     }
277     return this.util;
278   }
279 
280   private void setupReceiver() {
281     Configuration conf = new Configuration(util.getConfiguration());
282     conf.setBoolean("hbase.zipkin.is-in-client-mode", true);
283 
284     this.receiverHost = SpanReceiverHost.getInstance(conf);
285   }
286 }