Hadoop中CombineFileInputFormat详解

Hadoop中CombineFileInputFormat详解
在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
 
=12pt第一次:将同DN上的所有block生成Split,生成方式:
=12pt1.=12pt循环=12ptnodeToBlocks=12pt,获得每个=12ptDN上有哪些block
=12pt2.=12pt循环这些block列表
=12pt3.=12pt将block从=12ptblockToNodes=12pt中移除,避免同一个=12ptblock被包含在多个split中
=12pt4.=12pt将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从=12ptblockToNodes=12pt中被移除了,方便后面恢复到=12ptblockToNodes=12pt中
=12pt5.=12pt向临时变量=12ptcurSplitSize=12pt增加=12ptblock的大小
=12pt6.=12pt判断=12ptcurSplitSize=12pt是否已经超过了设置的=12ptmaxSize
=12pta) =12pt如果超过,执行并添加=12ptsplit信息,并重置=12ptcurSplitSize=12pt和=12ptvalidBlocks
=12ptb) =12pt没有超过,继续循环block列表,跳到第2步
=12pt7.=12pt当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于=12pt每个DN的最小split大小=12pt)
=12pta) =12pt如果允许,=12pt执行并添加=12ptsplit信息
=12ptb) =12pt如果不被允许,将这些剩余的block归还=12ptblockToNodes
=12pt8.=12pt重置
=12pt9.=12pt跳到步骤=12pt1
   01.// process all nodes and create splits that are local

02. // to a node.

03. //创建同一个DN上的split

04. for (Iterator<Map.Entry<String,

05. List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();

06. iter.hasNext();) {

07.

08. Map.Entry<String, List<OneBlockInfo>> one = iter.next();

09. nodes.add(one.getKey());

10. List<OneBlockInfo> blocksInNode = one.getValue();

11.

12. // for each block, copy it into validBlocks. Delete it from

13. // blockToNodes so that the same block does not appear in

14. // two different splits.

15. for (OneBlockInfo oneblock : blocksInNode) {

16. if (blockToNodes.containsKey(oneblock)) {

17. validBlocks.add(oneblock);

18. blockToNodes.remove(oneblock);

19. curSplitSize += oneblock.length;

20.

21. // if the accumulated split size exceeds the maximum, then

22. // create this split.

23. if (maxSize != 0 && curSplitSize >= maxSize) {

24. // create an input split and add it to the splits array

25. //创建这些block合并后的split,并将其split添加到split列表中

26. addCreatedSplit(job, splits, nodes, validBlocks);

27. //重置

28. curSplitSize = 0;

29. validBlocks.clear();

30. }

31. }

32. }

33. // if there were any blocks left over and their combined size is

34. // larger than minSplitNode, then combine them into one split.

35. // Otherwise add them back to the unprocessed pool. It is likely

36. // that they will be combined with other blocks from the same rack later on.

37. //其实这里的注释已经说的很清楚,我再按照我的理解说一下

38. /**

39. * 这里有几种情况:

40. * 1、在这个DN上还有没有被split的block,

41. * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),

42. * 将把这些block合并成一个split

43. * 2、剩余的block的大小还是没有达到,将剩余的这些block

44. * 归还给blockToNodes,等以后统一处理

45. */

46. if (minSizeNode != 0 && curSplitSize >= minSizeNode) {

47. // create an input split and add it to the splits array

48. addCreatedSplit(job, splits, nodes, validBlocks);

49. } else {

50. for (OneBlockInfo oneblock : validBlocks) {

51. blockToNodes.put(oneblock, oneblock.hosts);

52. }

53. }

54. validBlocks.clear();

55. nodes.clear();

56. curSplitSize = 0;

57. }

第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
  01.// if blocks in a rack are below the specified minimum size, then keep them

02. // in 'overflow'. After the processing of all racks is complete, these overflow

03. // blocks will be combined into splits.

04. ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();

05. ArrayList<String> racks = new ArrayList<String>();

06.

07. // Process all racks over and over again until there is no more work to do.

08. //这里处理的就不再是同一个DN上的block

09. //同一个DN上的已经被处理过了(上面的代码),这里是一些

10. //还没有被处理的block

11. while (blockToNodes.size() > 0) {

12.

13. // Create one split for this rack before moving over to the next rack.

14. // Come back to this rack after creating a single split for each of the

15. // remaining racks.

16. // Process one rack location at a time, Combine all possible blocks that

17. // reside on this rack as one split. (constrained by minimum and maximum

18. // split size).

19.

20. // iterate over all racks

21. //创建同机架的split

22. for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =

23. rackToBlocks.entrySet().iterator(); iter.hasNext();) {

24.

25. Map.Entry<String, List<OneBlockInfo>> one = iter.next();

26. racks.add(one.getKey());

27. List<OneBlockInfo> blocks = one.getValue();

28.

29. // for each block, copy it into validBlocks. Delete it from

30. // blockToNodes so that the same block does not appear in

31. // two different splits.

32. boolean createdSplit = false;

33. for (OneBlockInfo oneblock : blocks) {

34. //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split

35. if (blockToNodes.containsKey(oneblock)) {

36. validBlocks.add(oneblock);

37. blockToNodes.remove(oneblock);

38. curSplitSize += oneblock.length;

39.

40. // if the accumulated split size exceeds the maximum, then

41. // create this split.

42. if (maxSize != 0 && curSplitSize >= maxSize) {

43. // create an input split and add it to the splits array

44. addCreatedSplit(job, splits, getHosts(racks), validBlocks);

45. createdSplit = true;

46. break;

47. }

48. }

49. }

50.

51. // if we created a split, then just go to the next rack

52. if (createdSplit) {

53. curSplitSize = 0;

54. validBlocks.clear();

55. racks.clear();

56. continue;

57. }

58.

59. //还有没有被split的block

60. //如果这些block的大小大于了同机架的最小split,

61. //则创建split

62. //否则,将这些block留到后面处理

63. if (!validBlocks.isEmpty()) {

64. if (minSizeRack != 0 && curSplitSize >= minSizeRack) {

65. // if there is a mimimum size specified, then create a single split

66. // otherwise, store these blocks into overflow data structure

67. addCreatedSplit(job, splits, getHosts(racks), validBlocks);

68. } else {

69. // There were a few blocks in this rack that remained to be processed.

70. // Keep them in 'overflow' block list. These will be combined later.

71. overflowBlocks.addAll(validBlocks);

72. }

73. }

74. curSplitSize = 0;

75. validBlocks.clear();

76. racks.clear();

77. }

78. }

最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
 
 
 
=12pt源码总结:
 
=12pt合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
=12pt将可以合并的block写到同一个split中下面是实践代码:
=12pt原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
=12pt自定义CombineSequenceFileInputFormat:
01.package com.hadoop.combineInput;

02.

03.import java.io.IOException;

04.

05.import org.apache.hadoop.mapreduce.InputSplit;

06.import org.apache.hadoop.mapreduce.RecordReader;

07.import org.apache.hadoop.mapreduce.TaskAttemptContext;

08.import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;

09.import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;

10.import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

11.

12.public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {

13. @SuppressWarnings({ "unchecked", "rawtypes" })

14. @Override

15. public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

16. return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);

17. }

18.}

实现 CombineSequenceFileRecordReader

[java] view plaincopy在CODE上查看代码片派生到我的代码片

01.package com.hadoop.combineInput;

02.

03.

04.import java.io.IOException;

05.

06.import org.apache.hadoop.mapreduce.InputSplit;

07.import org.apache.hadoop.mapreduce.RecordReader;

08.import org.apache.hadoop.mapreduce.TaskAttemptContext;

09.import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

10.import org.apache.hadoop.mapreduce.lib.input.FileSplit;

11.import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;

12.import org.apache.hadoop.util.ReflectionUtils;

13.

14.

15.public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {

16. private CombineFileSplit split;

17. private TaskAttemptContext context;

18. private int index;

19. private RecordReader<K, V> rr;

20.

21. @SuppressWarnings("unchecked")

22. public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {

23. this.index = index;

24. this.split = (CombineFileSplit) split;

25. this.context = context;

26.

27. this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());

28. }

29.

30. @SuppressWarnings("unchecked")

31. @Override

32. public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {

33. this.split = (CombineFileSplit) curSplit;

34. this.context = curContext;

35.

36. if (null == rr) {

37. rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());

38. }

39.

40. FileSplit fileSplit = new FileSplit(this.split.getPath(index),

41. this.split.getOffset(index), this.split.getLength(index),

42. this.split.getLocations());

43.

44. this.rr.initialize(fileSplit, this.context);

45. }

46.

47. @Override

48. public float getProgress() throws IOException, InterruptedException {

49. return rr.getProgress();

50. }

51.

52. @Override

53. public void close() throws IOException {

54. if (null != rr) {

55. rr.close();

56. rr = null;

57. }

58. }

59.

60. @Override

61. public K getCurrentKey()

62. throws IOException, InterruptedException {

63. return rr.getCurrentKey();

64. }

65.

66. @Override

67. public V getCurrentValue()

68. throws IOException, InterruptedException {

69. return rr.getCurrentValue();

70. }

71.

72. @Override

73. public boolean nextKeyValue() throws IOException, InterruptedException {

74. return rr.nextKeyValue();

75. }

76.}

参考资料:http://sourceforge.net/p/openi ... .java

main函数比较简单,这里也贴出来下,方便后续自己记忆:

[java] view plaincopy在CODE上查看代码片派生到我的代码片

01.package com.hadoop.combineInput;

02.

03.import java.io.IOException;

04.

05.

06.import org.apache.hadoop.conf.Configuration;

07.import org.apache.hadoop.conf.Configured;

08.import org.apache.hadoop.fs.Path;

09.

10.import org.apache.hadoop.io.BytesWritable;

11.import org.apache.hadoop.io.Text;

12.import org.apache.hadoop.mapreduce.Job;

13.import org.apache.hadoop.mapreduce.Mapper;

14.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

15.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

16.import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

17.import org.apache.hadoop.util.Tool;

18.import org.apache.hadoop.util.ToolRunner;

19.

20.public class MergeFiles extends Configured implements Tool {

21. public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {

22.

23. public void map(BytesWritable key, Text value, Context context)

24. throws IOException, InterruptedException {

25. context.write(key, value);

26. }

27. } // END: MapClass

28.

29.

30. public int run(String[] args) throws Exception {

31. Configuration conf = new Configuration();

32. conf.set("mapred.max.split.size", "157286400");

33. conf.setBoolean("mapred.output.compress", true);

34. Job job = new Job(conf);

35. job.setJobName("MergeFiles");

36. job.setJarByClass(MergeFiles.class);

37.

38. job.setMapperClass(MapClass.class);

39. job.setInputFormatClass(CombineSequenceFileInputFormat.class);

40. job.setOutputFormatClass(SequenceFileOutputFormat.class);

41. job.setOutputKeyClass(BytesWritable.class);

42. job.setOutputValueClass(Text.class);

43.

44. FileInputFormat.addInputPaths(job, args[0]);

45. FileOutputFormat.setOutputPath(job, new Path(args[1]));

46.

47. job.setNumReduceTasks(0);

48.

49. return job.waitForCompletion(true) ? 0 : 1;

50. } // END: run

51.

52. public static void main(String[] args) throws Exception {

53. int ret = ToolRunner.run(new MergeFiles(), args);

54. System.exit(ret);

55. } // END: main

56.} //

 
 

0 个评论

要回复文章请先登录注册