mapreduce的排序主要分部分排序、全排序和辅助排序(二次排序) 可以直接在reduce中在对数据进行排序,但是这对于reduce的负担太重,数据处理的时间消耗也会大大增加
mapreduce机制中排序只会针对键进行排序,所以如果想对某个数据进行排序,一定要将其设置为map输出的键,排序主要发生在map的spill和合并spill file阶段和reduce拉取复制map端的数据后合并成reduce文件时。
排序的设置和调用的顺序
排序类及其方法调用主要遵循以下顺序:
如果设置mapreduce.job.output.key.comparator.class或设置了Job类的setSortComparatorClass(),则会使用设置的类的实例进行排序。该类需要继承WritableComparator
如果1中没有设置,键的类型是WritableComparable的子类,就会调用该键的compareTo()的方法
如果上述都RawComparator将字节流反序列化为WritableComparable对象,再调用其compareTo()进行比较
从以上可以看出,如果重写RawComparator的compare方法,在字节流的时候就进行所需的键的比较是性能最好的,因为这样无需在排序进行反序列化,但编写的难度相对较高。
部分排序
mapreduce根据键的排序使得每个reduce输出的文件都是排序好的
全排序
对于全排序最简单的方法是将所有的方法是将reduce的个数设置为1,但是如果数据量太大会对reduce造成过大的负担。
解决的方法可以通过改写partition来调整使得每个reduce之间数据保持有序,即保证一段连续区间内的数据都落在一个分区里,这样只要对reduce的输出文件排好序就可以达到全局排序。但是如果靠人为的划分分区,需要知道键的范围,并且很容易造成数据的倾斜,造成有的reduce分到数据特别多,而有的却很少,理想情况下是个分区记录数大致相等。解决方法是通过多数据进行采样,通过采样一小部分数据得到排序数据的大致分布,从而制定出如何划分分区。
hadoop内置了若干采样器,用于需要实现自定义采样的话需要继承InputSampler类(该类实现了Sampler接口)重写getSample方法,返回采样的键。InputSampler实现了静态方法writePartitionFile()方法来在致指定的hdfs路径下创建一个顺序文件来存储定义分区的键。将顺序文件加入分布式缓存,由TotalOrderPartitioner使用并未排序作业创建分区。
例子,使用采样分区的方式对一系列随机生成的服从正态分布的数据进行全排序。
原始输入数据,满足正态分布,随机生成
1 | hadoop fs -cat /user/test/yhj/random/input/random.txt | head |
重写InputFormat和RecordReader使直接读入的值是IntWritable类型
1 | public static class MyFileIntputFormat extends FileInputFormat<LongWritable, IntWritable>{ |
由于默认的采样器都是对键进行采样,而我们读入的键是偏移量,真正排序采样的是值,所以需要重写采样器,这里继承了RandomSampler,对值进行随机采样,RandomSampler的初始化需要三个参数RandomSampler(double freq, int numSamples, int maxSplitsSampled) freq代表采样频率,numSamples代表样本最大样本数,maxSplitsSampled代表最大分区数。
1 | public static class MySample extends InputSampler.RandomSampler<Object, IntWritable>{ |
入口函数和map函数,reduce函数采用默认实现
1 | public class SortByTotalPartitioner extends Configured implements Tool { |
辅助排序
有时候我们需要对键和值一起参与排序或者排序收到两个数据的共同影响,由于mapreduce只能支持对键排序,所以只能自定义Writable类型,支持存储多种数据,并重写compareTo()方法或者通过setSortComparatorClass()设置二次排序的方式。由于reduce的输入是key和value的迭代器,默认reduce是将键相同的值放在一个迭代器中,而由于二次排序修改了键的比较方法,因此有时候需要修改分组排序,job.setGroupingComparatorClass设置分组排序,通过分组比较器判断为相同即返回为0的代表在同一组中,这时候的分组的键将会取组中第一个的键,也就是排好序后组中第一个的键。
例子,选取下面数据中,每一年的最高温度
1 | 1990 28 |
我们需要将年份和温度都参与排序,因此需要自定义Writable类型,同时存储、序列化年份和温度
1 | import org.apache.hadoop.io.IntWritable; |
按照年份来分区和分组
1 | /** |
1 | /** |
这样同年份的数据会在同个分区中,也就是在一个reduce下,reduce的分组是按照年份来比较的,那么每个分组中的数据的年份是相同的,在年份相同的情况下按照温度从大到下排序,因此每个组的键取的是组中的第一个,也就是该年份下最大温度所在的IntPairWritable
map、reduce及其入口函数
1 | public class MaxTemperatureUsingSecondSort extends Configured implements Tool { |
程序结果,由于只有一个reduce,因此年份按照排序从小到大,如果想要在多个reduce中保存年份的全局排序,应该按照全区排序编写合适的分区方法
1 | 1990 28 |