1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static java.lang.String.format;
22
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.nio.ByteBuffer;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Map.Entry;
38 import java.util.Set;
39 import java.util.TreeMap;
40 import java.util.UUID;
41 import java.util.concurrent.Callable;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.ThreadPoolExecutor;
47 import java.util.concurrent.TimeUnit;
48
49 import org.apache.commons.lang.mutable.MutableInt;
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.conf.Configured;
54 import org.apache.hadoop.fs.FileStatus;
55 import org.apache.hadoop.fs.FileSystem;
56 import org.apache.hadoop.fs.Path;
57 import org.apache.hadoop.fs.permission.FsPermission;
58 import org.apache.hadoop.hbase.HBaseConfiguration;
59 import org.apache.hadoop.hbase.HColumnDescriptor;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.HTableDescriptor;
62 import org.apache.hadoop.hbase.KeyValue;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.classification.InterfaceAudience;
66 import org.apache.hadoop.hbase.classification.InterfaceStability;
67 import org.apache.hadoop.hbase.client.HBaseAdmin;
68 import org.apache.hadoop.hbase.client.HConnection;
69 import org.apache.hadoop.hbase.client.HTable;
70 import org.apache.hadoop.hbase.client.RegionServerCallable;
71 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
72 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
73 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
74 import org.apache.hadoop.hbase.io.HFileLink;
75 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
76 import org.apache.hadoop.hbase.io.Reference;
77 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
78 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
79 import org.apache.hadoop.hbase.io.hfile.HFile;
80 import org.apache.hadoop.hbase.io.hfile.HFileContext;
81 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
82 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
83 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
84 import org.apache.hadoop.hbase.regionserver.BloomType;
85 import org.apache.hadoop.hbase.regionserver.HStore;
86 import org.apache.hadoop.hbase.regionserver.StoreFile;
87 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
88 import org.apache.hadoop.hbase.security.UserProvider;
89 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
90 import org.apache.hadoop.hbase.util.Bytes;
91 import org.apache.hadoop.hbase.util.FSHDFSUtils;
92 import org.apache.hadoop.hbase.util.Pair;
93 import org.apache.hadoop.util.Tool;
94 import org.apache.hadoop.util.ToolRunner;
95
96 import com.google.common.collect.HashMultimap;
97 import com.google.common.collect.Multimap;
98 import com.google.common.collect.Multimaps;
99 import com.google.common.util.concurrent.ThreadFactoryBuilder;
100
101
102
103
104
105 @InterfaceAudience.Public
106 @InterfaceStability.Stable
107 public class LoadIncrementalHFiles extends Configured implements Tool {
108 private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
109 private HBaseAdmin hbAdmin;
110
111 public static final String NAME = "completebulkload";
112 public static final String MAX_FILES_PER_REGION_PER_FAMILY
113 = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
114 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
115 public final static String CREATE_TABLE_CONF_KEY = "create.table";
116
117 private int maxFilesPerRegionPerFamily;
118 private boolean assignSeqIds;
119
120
121 private FileSystem fs;
122
123 private FsDelegationToken fsDelegationToken;
124 private String bulkToken;
125 private UserProvider userProvider;
126
127 private LoadIncrementalHFiles() {}
128
129 public LoadIncrementalHFiles(Configuration conf) throws Exception {
130 super(conf);
131 initialize();
132 }
133
134 private void initialize() throws Exception {
135 if (hbAdmin == null) {
136
137 setConf(HBaseConfiguration.create(getConf()));
138 Configuration conf = getConf();
139
140 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
141 this.hbAdmin = new HBaseAdmin(conf);
142 this.userProvider = UserProvider.instantiate(conf);
143 this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
144 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
145 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
146 }
147 }
148
149 private void usage() {
150 System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
151 + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
152 + " Note: if you set this to 'no', then the target table must already exist in HBase\n"
153 + "\n");
154 }
155
156 private static interface BulkHFileVisitor<TFamily> {
157 TFamily bulkFamily(final byte[] familyName)
158 throws IOException;
159 void bulkHFile(final TFamily family, final FileStatus hfileStatus)
160 throws IOException;
161 }
162
163
164
165
166
167 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
168 final BulkHFileVisitor<TFamily> visitor) throws IOException {
169 if (!fs.exists(bulkDir)) {
170 throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
171 }
172
173 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
174 if (familyDirStatuses == null) {
175 throw new FileNotFoundException("No families found in " + bulkDir);
176 }
177
178 for (FileStatus familyStat : familyDirStatuses) {
179 if (!familyStat.isDir()) {
180 LOG.warn("Skipping non-directory " + familyStat.getPath());
181 continue;
182 }
183 Path familyDir = familyStat.getPath();
184 byte[] familyName = familyDir.getName().getBytes();
185 TFamily family = visitor.bulkFamily(familyName);
186
187 FileStatus[] hfileStatuses = fs.listStatus(familyDir);
188 for (FileStatus hfileStatus : hfileStatuses) {
189 if (!fs.isFile(hfileStatus.getPath())) {
190 LOG.warn("Skipping non-file " + hfileStatus);
191 continue;
192 }
193
194 Path hfile = hfileStatus.getPath();
195
196 String fileName = hfile.getName();
197 if (fileName.startsWith("_")) {
198 continue;
199 }
200 if (StoreFileInfo.isReference(fileName)) {
201 LOG.warn("Skipping reference " + fileName);
202 continue;
203 }
204 if (HFileLink.isHFileLink(fileName)) {
205 LOG.warn("Skipping HFileLink " + fileName);
206 continue;
207 }
208
209
210 try {
211 if (!HFile.isHFileFormat(fs, hfile)) {
212 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
213 continue;
214 }
215 } catch (FileNotFoundException e) {
216 LOG.warn("the file " + hfile + " was removed");
217 continue;
218 }
219
220 visitor.bulkHFile(family, hfileStatus);
221 }
222 }
223 }
224
225
226
227
228
229
230
231
232
233 static class LoadQueueItem {
234 final byte[] family;
235 final Path hfilePath;
236
237 public LoadQueueItem(byte[] family, Path hfilePath) {
238 this.family = family;
239 this.hfilePath = hfilePath;
240 }
241
242 public String toString() {
243 return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
244 }
245 }
246
247
248
249
250
251 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir)
252 throws IOException {
253 fs = hfofDir.getFileSystem(getConf());
254 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
255 @Override
256 public byte[] bulkFamily(final byte[] familyName) {
257 return familyName;
258 }
259 @Override
260 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
261 long length = hfile.getLen();
262 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
263 HConstants.DEFAULT_MAX_FILE_SIZE)) {
264 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
265 length + " bytes can be problematic as it may lead to oversplitting.");
266 }
267 ret.add(new LoadQueueItem(family, hfile.getPath()));
268 }
269 });
270 }
271
272
273
274
275
276
277
278
279
280
281 @SuppressWarnings("deprecation")
282 public void doBulkLoad(Path hfofDir, final HTable table)
283 throws TableNotFoundException, IOException
284 {
285 final HConnection conn = table.getConnection();
286
287 if (!conn.isTableAvailable(table.getName())) {
288 throw new TableNotFoundException("Table " +
289 Bytes.toStringBinary(table.getTableName()) +
290 "is not currently available.");
291 }
292
293
294 int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
295 Runtime.getRuntime().availableProcessors());
296 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
297 builder.setNameFormat("LoadIncrementalHFiles-%1$d");
298 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
299 60, TimeUnit.SECONDS,
300 new LinkedBlockingQueue<Runnable>(),
301 builder.build());
302 ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
303
304
305
306 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
307 try {
308 discoverLoadQueue(queue, hfofDir);
309
310 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
311 ArrayList<String> familyNames = new ArrayList<String>(families.size());
312 for (HColumnDescriptor family : families) {
313 familyNames.add(family.getNameAsString());
314 }
315 ArrayList<String> unmatchedFamilies = new ArrayList<String>();
316 Iterator<LoadQueueItem> queueIter = queue.iterator();
317 while (queueIter.hasNext()) {
318 LoadQueueItem lqi = queueIter.next();
319 String familyNameInHFile = Bytes.toString(lqi.family);
320 if (!familyNames.contains(familyNameInHFile)) {
321 unmatchedFamilies.add(familyNameInHFile);
322 }
323 }
324 if (unmatchedFamilies.size() > 0) {
325 String msg =
326 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
327 + unmatchedFamilies + "; valid family names of table "
328 + Bytes.toString(table.getTableName()) + " are: " + familyNames;
329 LOG.error(msg);
330 throw new IOException(msg);
331 }
332 int count = 0;
333
334 if (queue.isEmpty()) {
335 LOG.warn("Bulk load operation did not find any files to load in " +
336 "directory " + hfofDir.toUri() + ". Does it contain files in " +
337 "subdirectories that correspond to column family names?");
338 return;
339 }
340
341
342
343
344 fsDelegationToken.acquireDelegationToken(fs);
345 if(isSecureBulkLoadEndpointAvailable()) {
346 bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
347 }
348
349
350 while (!queue.isEmpty()) {
351
352 final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
353 if (count != 0) {
354 LOG.info("Split occured while grouping HFiles, retry attempt " +
355 + count + " with " + queue.size() + " files remaining to group or split");
356 }
357
358 int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 0);
359 if (maxRetries != 0 && count >= maxRetries) {
360 throw new IOException("Retry attempted " + count +
361 " times without completing, bailing out");
362 }
363 count++;
364
365
366 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
367 pool, queue, startEndKeys);
368
369 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
370
371 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
372 + " hfiles to one family of one region");
373 }
374
375 bulkLoadPhase(table, conn, pool, queue, regionGroups);
376
377
378
379
380 }
381
382 } finally {
383 fsDelegationToken.releaseDelegationToken();
384 if(bulkToken != null) {
385 new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
386 }
387 pool.shutdown();
388 if (queue != null && !queue.isEmpty()) {
389 StringBuilder err = new StringBuilder();
390 err.append("-------------------------------------------------\n");
391 err.append("Bulk load aborted with some files not yet loaded:\n");
392 err.append("-------------------------------------------------\n");
393 for (LoadQueueItem q : queue) {
394 err.append(" ").append(q.hfilePath).append('\n');
395 }
396 LOG.error(err);
397 }
398 }
399
400 if (queue != null && !queue.isEmpty()) {
401 throw new RuntimeException("Bulk load aborted with some files not yet loaded."
402 + "Please check log for more details.");
403 }
404 }
405
406
407
408
409
410
411 protected void bulkLoadPhase(final HTable table, final HConnection conn,
412 ExecutorService pool, Deque<LoadQueueItem> queue,
413 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
414
415 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
416 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
417 final byte[] first = e.getKey().array();
418 final Collection<LoadQueueItem> lqis = e.getValue();
419
420 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
421 public List<LoadQueueItem> call() throws Exception {
422 List<LoadQueueItem> toRetry =
423 tryAtomicRegionLoad(conn, table.getName(), first, lqis);
424 return toRetry;
425 }
426 };
427 loadingFutures.add(pool.submit(call));
428 }
429
430
431 for (Future<List<LoadQueueItem>> future : loadingFutures) {
432 try {
433 List<LoadQueueItem> toRetry = future.get();
434
435
436 queue.addAll(toRetry);
437
438 } catch (ExecutionException e1) {
439 Throwable t = e1.getCause();
440 if (t instanceof IOException) {
441
442
443 throw new IOException("BulkLoad encountered an unrecoverable problem", t);
444 }
445 LOG.error("Unexpected execution exception during bulk load", e1);
446 throw new IllegalStateException(t);
447 } catch (InterruptedException e1) {
448 LOG.error("Unexpected interrupted exception during bulk load", e1);
449 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
450 }
451 }
452 }
453
454 private boolean checkHFilesCountPerRegionPerFamily(
455 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
456 for (Entry<ByteBuffer,
457 ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
458 final Collection<LoadQueueItem> lqis = e.getValue();
459 HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
460 for (LoadQueueItem lqi: lqis) {
461 MutableInt count = filesMap.get(lqi.family);
462 if (count == null) {
463 count = new MutableInt();
464 filesMap.put(lqi.family, count);
465 }
466 count.increment();
467 if (count.intValue() > maxFilesPerRegionPerFamily) {
468 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
469 + " hfiles to family " + Bytes.toStringBinary(lqi.family)
470 + " of region with start key "
471 + Bytes.toStringBinary(e.getKey()));
472 return false;
473 }
474 }
475 }
476 return true;
477 }
478
479
480
481
482
483 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
484 ExecutorService pool, Deque<LoadQueueItem> queue,
485 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
486
487
488 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
489 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
490
491
492 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
493 while (!queue.isEmpty()) {
494 final LoadQueueItem item = queue.remove();
495
496 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
497 public List<LoadQueueItem> call() throws Exception {
498 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
499 return splits;
500 }
501 };
502 splittingFutures.add(pool.submit(call));
503 }
504
505
506 for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
507 try {
508 List<LoadQueueItem> splits = lqis.get();
509 if (splits != null) {
510 queue.addAll(splits);
511 }
512 } catch (ExecutionException e1) {
513 Throwable t = e1.getCause();
514 if (t instanceof IOException) {
515 LOG.error("IOException during splitting", e1);
516 throw (IOException)t;
517 }
518 LOG.error("Unexpected execution exception during splitting", e1);
519 throw new IllegalStateException(t);
520 } catch (InterruptedException e1) {
521 LOG.error("Unexpected interrupted exception during splitting", e1);
522 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
523 }
524 }
525 return regionGroups;
526 }
527
528
529 private String getUniqueName() {
530 return UUID.randomUUID().toString().replaceAll("-", "");
531 }
532
533 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
534 final HTable table, byte[] startKey,
535 byte[] splitKey) throws IOException {
536 final Path hfilePath = item.hfilePath;
537
538
539
540 final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
541
542 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
543 "region. Splitting...");
544
545 String uniqueName = getUniqueName();
546 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
547 Path botOut = new Path(tmpDir, uniqueName + ".bottom");
548 Path topOut = new Path(tmpDir, uniqueName + ".top");
549 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
550 botOut, topOut);
551
552 FileSystem fs = tmpDir.getFileSystem(getConf());
553 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
554 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
555
556
557
558 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
559 lqis.add(new LoadQueueItem(item.family, botOut));
560 lqis.add(new LoadQueueItem(item.family, topOut));
561
562 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
563 return lqis;
564 }
565
566
567
568
569
570
571
572
573
574
575 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
576 final LoadQueueItem item, final HTable table,
577 final Pair<byte[][], byte[][]> startEndKeys)
578 throws IOException {
579 final Path hfilePath = item.hfilePath;
580 HFile.Reader hfr = HFile.createReader(fs, hfilePath,
581 new CacheConfig(getConf()), getConf());
582 final byte[] first, last;
583 try {
584 hfr.loadFileInfo();
585 first = hfr.getFirstRowKey();
586 last = hfr.getLastRowKey();
587 } finally {
588 hfr.close();
589 }
590
591 LOG.info("Trying to load hfile=" + hfilePath +
592 " first=" + Bytes.toStringBinary(first) +
593 " last=" + Bytes.toStringBinary(last));
594 if (first == null || last == null) {
595 assert first == null && last == null;
596
597 LOG.info("hfile " + hfilePath + " has no entries, skipping");
598 return null;
599 }
600 if (Bytes.compareTo(first, last) > 0) {
601 throw new IllegalArgumentException(
602 "Invalid range: " + Bytes.toStringBinary(first) +
603 " > " + Bytes.toStringBinary(last));
604 }
605 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
606 Bytes.BYTES_COMPARATOR);
607 if (idx < 0) {
608
609
610 idx = -(idx + 1) - 1;
611 }
612 final int indexForCallable = idx;
613
614
615
616
617
618
619 if (indexForCallable < 0) {
620 throw new IOException("The first region info for table "
621 + Bytes.toString(table.getTableName())
622 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
623 } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
624 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
625 throw new IOException("The last region info for table "
626 + Bytes.toString(table.getTableName())
627 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
628 } else if (indexForCallable + 1 < startEndKeys.getFirst().length
629 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
630 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
631 throw new IOException("The endkey of one region for table "
632 + Bytes.toString(table.getTableName())
633 + " is not equal to the startkey of the next region in hbase:meta."
634 + "Please use hbck tool to fix it first.");
635 }
636
637 boolean lastKeyInRange =
638 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
639 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
640 if (!lastKeyInRange) {
641 List<LoadQueueItem> lqis = splitStoreFile(item, table,
642 startEndKeys.getFirst()[indexForCallable],
643 startEndKeys.getSecond()[indexForCallable]);
644 return lqis;
645 }
646
647
648 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
649 return null;
650 }
651
652
653
654
655 @Deprecated
656 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
657 final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
658 throws IOException {
659 return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis);
660 }
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
676 final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
677 throws IOException {
678 final List<Pair<byte[], String>> famPaths =
679 new ArrayList<Pair<byte[], String>>(lqis.size());
680 for (LoadQueueItem lqi : lqis) {
681 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
682 }
683
684 final RegionServerCallable<Boolean> svrCallable =
685 new RegionServerCallable<Boolean>(conn, tableName, first) {
686 @Override
687 public Boolean call() throws Exception {
688 SecureBulkLoadClient secureClient = null;
689 boolean success = false;
690
691 try {
692 LOG.debug("Going to connect to server " + getLocation() + " for row "
693 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
694 byte[] regionName = getLocation().getRegionInfo().getRegionName();
695 if (!isSecureBulkLoadEndpointAvailable()) {
696 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
697 } else {
698 HTable table = new HTable(conn.getConfiguration(), getTableName());
699 secureClient = new SecureBulkLoadClient(table);
700 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
701 bulkToken, getLocation().getRegionInfo().getStartKey());
702 }
703 return success;
704 } finally {
705
706
707
708 if(secureClient != null && !success) {
709 FileSystem targetFs = FileSystem.get(getConf());
710
711
712
713 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
714 for(Pair<byte[], String> el : famPaths) {
715 Path hfileStagingPath = null;
716 Path hfileOrigPath = new Path(el.getSecond());
717 try {
718 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
719 hfileOrigPath.getName());
720 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
721 LOG.debug("Moved back file " + hfileOrigPath + " from " +
722 hfileStagingPath);
723 } else if(targetFs.exists(hfileStagingPath)){
724 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
725 hfileStagingPath);
726 }
727 } catch(Exception ex) {
728 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
729 hfileStagingPath, ex);
730 }
731 }
732 }
733 }
734 }
735 }
736 };
737
738 try {
739 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
740 Configuration conf = getConf();
741 boolean success = RpcRetryingCallerFactory.instantiate(conf, null).<Boolean> newCaller()
742 .callWithRetries(svrCallable);
743 if (!success) {
744 LOG.warn("Attempt to bulk load region containing "
745 + Bytes.toStringBinary(first) + " into table "
746 + tableName + " with files " + lqis
747 + " failed. This is recoverable and they will be retried.");
748 toRetry.addAll(lqis);
749 }
750
751 return toRetry;
752 } catch (IOException e) {
753 LOG.error("Encountered unrecoverable error from region server, additional details: "
754 + svrCallable.getExceptionMessageAdditionalDetail(), e);
755 throw e;
756 }
757 }
758
759 private boolean isSecureBulkLoadEndpointAvailable() {
760 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
761 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
762 }
763
764
765
766
767
768 static void splitStoreFile(
769 Configuration conf, Path inFile,
770 HColumnDescriptor familyDesc, byte[] splitKey,
771 Path bottomOut, Path topOut) throws IOException
772 {
773
774 Reference topReference = Reference.createTopReference(splitKey);
775 Reference bottomReference = Reference.createBottomReference(splitKey);
776
777 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
778 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
779 }
780
781
782
783
784 private static void copyHFileHalf(
785 Configuration conf, Path inFile, Path outFile, Reference reference,
786 HColumnDescriptor familyDescriptor)
787 throws IOException {
788 FileSystem fs = inFile.getFileSystem(conf);
789 CacheConfig cacheConf = new CacheConfig(conf);
790 HalfStoreFileReader halfReader = null;
791 StoreFile.Writer halfWriter = null;
792 try {
793 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
794 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
795
796 int blocksize = familyDescriptor.getBlocksize();
797 Algorithm compression = familyDescriptor.getCompression();
798 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
799 HFileContext hFileContext = new HFileContextBuilder()
800 .withCompression(compression)
801 .withChecksumType(HStore.getChecksumType(conf))
802 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
803 .withBlockSize(blocksize)
804 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
805 .build();
806 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
807 fs)
808 .withFilePath(outFile)
809 .withBloomType(bloomFilterType)
810 .withFileContext(hFileContext)
811 .build();
812 HFileScanner scanner = halfReader.getScanner(false, false, false);
813 scanner.seekTo();
814 do {
815 KeyValue kv = scanner.getKeyValue();
816 halfWriter.append(kv);
817 } while (scanner.next());
818
819 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
820 if (shouldCopyHFileMetaKey(entry.getKey())) {
821 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
822 }
823 }
824 } finally {
825 if (halfWriter != null) halfWriter.close();
826 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
827 }
828 }
829
830 private static boolean shouldCopyHFileMetaKey(byte[] key) {
831 return !HFile.isReservedFileInfoKey(key);
832 }
833
834 private boolean doesTableExist(TableName tableName) throws Exception {
835 return hbAdmin.tableExists(tableName);
836 }
837
838
839
840
841
842
843
844
845
846
847
848
849
850 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
851 ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
852 int runningValue = 0;
853 byte[] currStartKey = null;
854 boolean firstBoundary = true;
855
856 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
857 if (runningValue == 0) currStartKey = item.getKey();
858 runningValue += item.getValue();
859 if (runningValue == 0) {
860 if (!firstBoundary) keysArray.add(currStartKey);
861 firstBoundary = false;
862 }
863 }
864
865 return keysArray.toArray(new byte[0][0]);
866 }
867
868
869
870
871
872 private void createTable(TableName tableName, String dirPath) throws Exception {
873 final Path hfofDir = new Path(dirPath);
874 final FileSystem fs = hfofDir.getFileSystem(getConf());
875
876
877
878 final HTableDescriptor htd = new HTableDescriptor(tableName);
879 final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
880 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
881 @Override
882 public HColumnDescriptor bulkFamily(final byte[] familyName) {
883 HColumnDescriptor hcd = new HColumnDescriptor(familyName);
884 htd.addFamily(hcd);
885 return hcd;
886 }
887 @Override
888 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
889 throws IOException {
890 Path hfile = hfileStatus.getPath();
891 HFile.Reader reader = HFile.createReader(fs, hfile,
892 new CacheConfig(getConf()), getConf());
893 try {
894 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
895 hcd.setCompressionType(reader.getFileContext().getCompression());
896 LOG.info("Setting compression " + hcd.getCompressionType().name() +
897 " for family " + hcd.toString());
898 }
899 reader.loadFileInfo();
900 byte[] first = reader.getFirstRowKey();
901 byte[] last = reader.getLastRowKey();
902
903 LOG.info("Trying to figure out region boundaries hfile=" + hfile +
904 " first=" + Bytes.toStringBinary(first) +
905 " last=" + Bytes.toStringBinary(last));
906
907
908 Integer value = map.containsKey(first)? map.get(first):0;
909 map.put(first, value+1);
910
911 value = map.containsKey(last)? map.get(last):0;
912 map.put(last, value-1);
913 } finally {
914 reader.close();
915 }
916 }
917 });
918
919 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
920 this.hbAdmin.createTable(htd,keys);
921
922 LOG.info("Table "+ tableName +" is available!!");
923 }
924
925 @Override
926 public int run(String[] args) throws Exception {
927 if (args.length != 2) {
928 usage();
929 return -1;
930 }
931
932 initialize();
933
934 String dirPath = args[0];
935 TableName tableName = TableName.valueOf(args[1]);
936
937 boolean tableExists = this.doesTableExist(tableName);
938 if (!tableExists) {
939 if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
940 this.createTable(tableName, dirPath);
941 } else {
942 String errorMsg = format("Table '%s' does not exist.", tableName);
943 LOG.error(errorMsg);
944 throw new TableNotFoundException(errorMsg);
945 }
946 }
947
948 Path hfofDir = new Path(dirPath);
949 HTable table = new HTable(getConf(), tableName);
950
951 doBulkLoad(hfofDir, table);
952 return 0;
953 }
954
955 public static void main(String[] args) throws Exception {
956 Configuration conf = HBaseConfiguration.create();
957 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
958 System.exit(ret);
959 }
960
961 }