mapreduce 排序

mapreduce的排序主要分部分排序、全排序和辅助排序(二次排序) 可以直接在reduce中在对数据进行排序,但是这对于reduce的负担太重,数据处理的时间消耗也会大大增加

mapreduce机制中排序只会针对键进行排序,所以如果想对某个数据进行排序,一定要将其设置为map输出的键,排序主要发生在map的spill和合并spill file阶段和reduce拉取复制map端的数据后合并成reduce文件时。

排序的设置和调用的顺序

排序类及其方法调用主要遵循以下顺序:

  1. 如果设置mapreduce.job.output.key.comparator.class或设置了Job类的setSortComparatorClass(),则会使用设置的类的实例进行排序。该类需要继承WritableComparator

  2. 如果1中没有设置,键的类型是WritableComparable的子类,就会调用该键的compareTo()的方法

  3. 如果上述都RawComparator将字节流反序列化为WritableComparable对象,再调用其compareTo()进行比较

从以上可以看出,如果重写RawComparator的compare方法,在字节流的时候就进行所需的键的比较是性能最好的,因为这样无需在排序进行反序列化,但编写的难度相对较高。

部分排序

mapreduce根据键的排序使得每个reduce输出的文件都是排序好的

全排序

对于全排序最简单的方法是将所有的方法是将reduce的个数设置为1,但是如果数据量太大会对reduce造成过大的负担。

解决的方法可以通过改写partition来调整使得每个reduce之间数据保持有序,即保证一段连续区间内的数据都落在一个分区里,这样只要对reduce的输出文件排好序就可以达到全局排序。但是如果靠人为的划分分区,需要知道键的范围,并且很容易造成数据的倾斜,造成有的reduce分到数据特别多,而有的却很少,理想情况下是个分区记录数大致相等。解决方法是通过多数据进行采样,通过采样一小部分数据得到排序数据的大致分布,从而制定出如何划分分区。

hadoop内置了若干采样器,用于需要实现自定义采样的话需要继承InputSampler类(该类实现了Sampler接口)重写getSample方法,返回采样的键。InputSampler实现了静态方法writePartitionFile()方法来在致指定的hdfs路径下创建一个顺序文件来存储定义分区的键。将顺序文件加入分布式缓存,由TotalOrderPartitioner使用并未排序作业创建分区。

例子,使用采样分区的方式对一系列随机生成的服从正态分布的数据进行全排序。

原始输入数据,满足正态分布,随机生成

1
2
3
4
5
6
7
8
9
10
11
hadoop fs -cat /user/test/yhj/random/input/random.txt | head
1281658
449107
1195471
-422913
1970634
916817
-557165
-1435819
1924747
-262067

重写InputFormat和RecordReader使直接读入的值是IntWritable类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static class MyFileIntputFormat extends FileInputFormat<LongWritable, IntWritable>{

@Override
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
RecordReader<LongWritable, IntWritable> reader = new MyRecordReader();
reader.initialize(inputSplit, taskAttemptContext);
return reader;
}
}

public static class MyRecordReader extends RecordReader<LongWritable, IntWritable>{

private LineRecordReader lineRecordReader;

public MyRecordReader(){
lineRecordReader = new LineRecordReader();
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
lineRecordReader.initialize(inputSplit, taskAttemptContext);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lineRecordReader.nextKeyValue();
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return lineRecordReader.getCurrentKey();
}

@Override
public IntWritable getCurrentValue() throws IOException, InterruptedException {
IntWritable value = new IntWritable(Integer.parseInt(lineRecordReader.getCurrentValue().toString()));
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}

@Override
public void close() throws IOException {
lineRecordReader.close();
}
}

由于默认的采样器都是对键进行采样,而我们读入的键是偏移量,真正排序采样的是值,所以需要重写采样器,这里继承了RandomSampler,对值进行随机采样,RandomSampler的初始化需要三个参数RandomSampler(double freq, int numSamples, int maxSplitsSampled) freq代表采样频率,numSamples代表样本最大样本数,maxSplitsSampled代表最大分区数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static class MySample extends InputSampler.RandomSampler<Object, IntWritable>{

public MySample(double freq, int numSamples) {
super(freq, numSamples);
}

public MySample(double freq, int numSamples, int maxSplitsSampled){
super(freq, numSamples, maxSplitsSampled);
}

@Override
public IntWritable[] getSample(InputFormat<Object, IntWritable> inf, Job job) throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
List<IntWritable> samples = new ArrayList<>();
int splitToSample = Math.min(this.numSamples, splits.size());
Random random = new Random();
long seed = random.nextLong();
random.setSeed(seed);

//随机交换split
for(int i = 0; i < splits.size(); i++){
InputSplit tmp = splits.get(i);
int index = random.nextInt(splits.size());
splits.set(i, splits.get(index));
splits.set(index, tmp);
}

//采样
for(int i = 0; i < splitToSample || i < this.numSamples && samples.size() < this.numSamples ; i++){
TaskAttemptContext sampleContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader<Object, IntWritable> reader = inf.createRecordReader(splits.get(i), sampleContext);

while (reader.nextKeyValue()) {
//根据采样频率采样
if (random.nextDouble() < this.freq) {
if (samples.size() < this.numSamples) {
IntWritable value = new IntWritable();
samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentValue(), value));
} else {
int index = random.nextInt(this.numSamples);
if (index != this.numSamples) {
IntWritable value = new IntWritable();
samples.set(index, ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentValue(), value));
}
this.freq *= (double) (this.numSamples - 1) / (double) this.numSamples;
}
}
}
reader.close();
}
IntWritable[] result = new IntWritable[samples.size()];
samples.toArray(result);
return result;
}
}

入口函数和map函数,reduce函数采用默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SortByTotalPartitioner extends Configured implements Tool {
public static class SortByTotalPartitionerMap extends Mapper<LongWritable, IntWritable, IntWritable, NullWritable>{
@Override
protected void map(LongWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}

@Override
public int run(String[] args) throws Exception {
Job job = JobDefaultInit.getSubmintDefaultJob(this, getConf(),
"E:\\JavaProjects\\hdpWork\\target\\hdpWork.jar", args);
// InputSampler.Sampler<>
job.setMapperClass(SortByTotalPartitionerMap.class);
job.setInputFormatClass(MyFileIntputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setNumReduceTasks(3);

InputSampler.Sampler<Object, IntWritable> sampler = new MySample(0.1, 1000, 10); //构造采样器
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("/partitionFile")); //设置共享分区文件路径
InputSampler.writePartitionFile(job, sampler); //写入分区文件,其中调用了 TotalOrderPartitioner.getPartitionFile获取文件路径
//将共享分区文件加入到分布式缓存中
String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());
URI partitionUri = new URI(partitionFile);
job.addCacheFile(partitionUri);
return job.waitForCompletion(true) ? 0:1;
}

public static void main(String[] args) throws Exception{
System.exit(ToolRunner.run(new SortByTotalPartitioner(), args));
}
}

辅助排序

有时候我们需要对键和值一起参与排序或者排序收到两个数据的共同影响,由于mapreduce只能支持对键排序,所以只能自定义Writable类型,支持存储多种数据,并重写compareTo()方法或者通过setSortComparatorClass()设置二次排序的方式。由于reduce的输入是key和value的迭代器,默认reduce是将键相同的值放在一个迭代器中,而由于二次排序修改了键的比较方法,因此有时候需要修改分组排序,job.setGroupingComparatorClass设置分组排序,通过分组比较器判断为相同即返回为0的代表在同一组中,这时候的分组的键将会取组中第一个的键,也就是排好序后组中第一个的键。

例子,选取下面数据中,每一年的最高温度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1990    28
1995 34
1992 -12
1994 2
1995 12
1993 34
1992 32
1993 1
1994 8
1995 23
1993 24
1993 42
1994 32
1992 30
1991 22
1993 26
1990 28
1992 11
1990 18
1994 16
1992 15
1994 4
1990 28
1995 5
1990 28
1992 3
1990 28

我们需要将年份和温度都参与排序,因此需要自定义Writable类型,同时存储、序列化年份和温度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class IntPairWritable implements WritableComparable<IntPairWritable> {

int year;
int tempeture;

public IntPairWritable(){
year = 0;
tempeture = 0;
}

public IntPairWritable(IntWritable year, IntWritable tempeture){
this.year = year.get();
this.tempeture = tempeture.get();
}

public IntPairWritable(int year, int tempeture){
this.year = year;
this.tempeture = tempeture;
}

public int getYear() {
return year;
}


public int getTempeture() {
return tempeture;
}

public void setYear(int year) {
this.year = year;
}

public void setTempeture(int tempeture) {
this.tempeture = tempeture;
}


@Override
public boolean equals(Object obj) {
if(!(obj instanceof IntPairWritable)){
return false;
}
IntPairWritable other = (IntPairWritable)obj;
return other.year == year && other.tempeture == tempeture;
}

@Override
public int compareTo(IntPairWritable o) {
int cmp = year - o.getYear();
if(cmp != 0){
return cmp;
}
return tempeture - o.getTempeture();
}

/**
* 序列化到输出流中
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(tempeture);
}

/**
* 从输入流中反序列化读入
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
year = dataInput.readInt();
tempeture = dataInput.readInt();
}

@Override
public int hashCode() {
return new Integer(year).hashCode()*163 + new Integer(tempeture).hashCode();
}

@Override
public String toString() {
return year + "\t" + tempeture;
}

/**
* 自定义Writable类型的排序比较器
*/
public static class Comparator extends WritableComparator{
public Comparator(){
super(IntPairWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
int tmp = ((IntPairWritable)a).getYear() - ((IntPairWritable)b).getYear();
if(tmp != 0){
return tmp;
}
return - (((IntPairWritable)a).getTempeture() - ((IntPairWritable)b).getTempeture());
}
}
}

按照年份来分区和分组

1
2
3
4
5
6
7
8
9
/**
* 只按照年份来分区
*/
public static class YearPartition extends Partitioner<IntPairWritable, NullWritable>{
@Override
public int getPartition(IntPairWritable intPairWritable, NullWritable nullWritable, int i) {
return Math.abs(intPairWritable.getYear()) % i;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 分组只按照年份来比较
*/
public static class GroupComparator extends WritableComparator{
public GroupComparator(){
super(IntPairWritable.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
return ((IntPairWritable)a).getYear() - ((IntPairWritable)b).getYear();
}
}

这样同年份的数据会在同个分区中,也就是在一个reduce下,reduce的分组是按照年份来比较的,那么每个分组中的数据的年份是相同的,在年份相同的情况下按照温度从大到下排序,因此每个组的键取的是组中的第一个,也就是该年份下最大温度所在的IntPairWritable

map、reduce及其入口函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class MaxTemperatureUsingSecondSort extends Configured implements Tool {

public static class MaxTemperatureUsingSecondSortMap extends Mapper<LongWritable, Text, IntPairWritable, NullWritable>{

IntPairWritable outKey = new IntPairWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] infos = value.toString().split("\t");
outKey.setYear(Integer.parseInt(infos[0]));
outKey.setTempeture(Integer.parseInt(infos[1]));
context.write(outKey, NullWritable.get());
}
}

/**
* reduce的key存储着该年份下的最高气味
*/
public static class MaxTemperatureUsingSecondSortReduce extends Reducer<IntPairWritable, NullWritable, IntPairWritable, NullWritable>{
@Override
protected void reduce(IntPairWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

@Override
public int run(String[] args) throws Exception {
Job job = JobDefaultInit.getSubmintDefaultJob(this, getConf(),
"E:\\JavaProjects\\hdpWork\\target\\hdpWork.jar", args);
job.setJobName("Max Temperature Using Second Sort");
job.setMapperClass(MaxTemperatureUsingSecondSortMap.class);
job.setReducerClass(MaxTemperatureUsingSecondSortReduce.class);
job.setPartitionerClass(YearPartition.class);
job.setSortComparatorClass(IntPairWritable.Comparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setOutputKeyClass(IntPairWritable.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 1:0;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MaxTemperatureUsingSecondSort(), args));
}
}

程序结果,由于只有一个reduce,因此年份按照排序从小到大,如果想要在多个reduce中保存年份的全局排序,应该按照全区排序编写合适的分区方法

1
2
3
4
5
6
1990    28
1991 22
1992 32
1993 42
1994 32
1995 34
0%