MapReduce中的二次排序

MapReduce操作时,我们知道传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。
(1)Mapper任务会接收输入分片,然后不断的调用map函数,对记录进行处理。处理完毕后,转换为新的<key,value>输出。
(2)对map函数输出的<key, value>调用分区函数,对数据进行分区。不同分区的数据会被送到不同的Reducer任务中。
(3)对于不同分区的数据,会按照key进行排序,这里的key必须实现WritableComparable接口。该接口实现了Comparable接口,因此可以进行比较排序。
(4)对于排序后的<key,value>,会按照key进行分组。如果key相同,那么相同key的<key,value>就被分到一个组中。最终,每个分组会调用一次reduce函数。
(5)排序、分组后的数据会被送到Reducer节点。
在MapReduce的体系结构中,我们没有看到对value的排序操作。怎么实现对value的排序哪?这就需要我们变通的去实现这个需求。
变通手段:我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。
下面看个例子,结合着理解。
假设有以下输入数据,这是两列整数,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。
20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58
分析一下, 这是一个典型的二次排序问题。
我们先对现在第一列和第二列整数创建一个新的类,作为newkey,代码如下
/** * 把第一列整数和第二列作为类的属性,并且实现WritableComparable接口 */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public int hashCode() { return first+"".hashCode() + second+"".hashCode(); } @Override public boolean equals(Object right) { if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; } else { return false; } } //这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法 @Override public int compareTo(IntPair o) { if (first != o.first) { return first - o.first; } else if (second != o.second) { return second - o.second; } else { return 0; } } }
一定要注意上面的compareTo方法,先按照first比较,再按照second比较。在以后调用的时候,key就是first,value就是second。
下面看一下分组比较函数,代码如下
/** * 在分组比较的时候,只比较原来的key,而不是组合key。 */ public static class GroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int first1 = o1.getFirst(); int first2 = o2.getFirst(); return first1 - first2; } }
一定要注意上面代码中,虽然泛型是IntPair,但是比较的始终是第一个字段,而不是所有的字段。因为要按照原有的key进行分组啊。
如果以上的代码明白,再看一下自定义的Mapper类和Reducer类吧
public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } }public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } }
在map函数中,要注意k2是由哪几个字段组成的;在reduce函数中,要注意输出的k3是IntPair中的第一个字段,而不是所有字段。
好了,看一下驱动代码吧,如下
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop2:9000"), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, "secondary sort"); job.setJarByClass(SecondarySortApp.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setGroupingComparatorClass(GroupingComparator.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); }
以上驱动代码中,重大变化是设置了分组比较函数。好了,看看执行结果吧
------------------------------------------------ 20 21 ------------------------------------------------ 50 51 50 52 50 53 50 54 ------------------------------------------------ 60 51 60 52 60 53 60 56 60 57 60 61 ------------------------------------------------ 70 54 70 55 70 56 70 57 70 58 70 58
看看,是不是我们想要的结果啊!!

1 个评论

学习啦,多谢指点!

要回复文章请先登录注册