MapReduce 多路径输出

mapreduce中实现多路径输出主要使用MulitipleOutputs类

通过两个例子可以掌握

输入样例 mulitipleInput.txt

1
2
3
4
5
6
7
file1 001
file2 002
file3 003
file2 004
file1 005
file1 006
file3 007

输出:

file1和file3开头的记录归到一个文件下

file2和file3开头的记录归到一个文件下

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputsExample extends Configured implements Tool{

public static class MultipleMapper extends Mapper<Object, Text, Text, NullWritable>{
private MultipleOutputs<Text, NullWritable> mos;
@Override
protected void setup(Mapper<Object, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
mos = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] infos = value.toString().split(" ");
if(infos[0].equals("file1")){
mos.write("file1", value, NullWritable.get());
}else if (infos[0].equals("file2")) {
mos.write("file2", value, NullWritable.get());
} else {
mos.write("file1", value, NullWritable.get());
mos.write("file2", value, NullWritable.get());
}
}

@Override
protected void cleanup(Mapper<Object, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
mos.close();
}
}

@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2){
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(MultipleOutputsExample.class);
job.setMapperClass(MultipleMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
MultipleOutputs.addNamedOutput(job, "file1", TextOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "file2", TextOutputFormat.class, Text.class, NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
return job.waitForCompletion(true)? 0:1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MultipleOutputsExample(), args));
}
}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
$ hadoop fs -cat /user/test/wordTest/mulitipleOutput/file1-m-00000
file1 001
file3 003
file1 005
file1 006
file3 007

$ hadoop fs -cat /user/test/wordTest/mulitipleOutput/file2-m-00000
file2 002
file3 003
file2 004
file3 007
file2 008

如果想把file1和file2的内容放入不同的目录下,可以通过指定baseOutputPath,将file1开头的文件放在同一个目录中管理。
mos.write("file1", value, NullWritable.get());mos.write("file2", value, NullWritable.get());改为mos.write("file1", value, NullWritable.get(),"file1/part");mos.write("file2", value, NullWritable.get(),"file2/part");mos.write(value, NullWritable.get(),"file1/part");mos.write(value, NullWritable.get(),"file2/part");可以看到输出结果

1
2
3
4
5
6
$ hadoop fs -ls /user/test/wordTest/mulitipleOutput
Found 4 items
-rw-r--r-- 3 hdfs hdfs 0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/_SUCCESS
drwxr-xr-x - hdfs hdfs 0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/file1
drwxr-xr-x - hdfs hdfs 0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/file2
-rw-r--r-- 3 hdfs hdfs 0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/part-r-00000

指定baseOutputPath输出路径和输出文件名直接按照baseOutPutPath指定,但是默认输出文件名后缀会跟上-r-00000,如果想更改可以继承FileOutputFormat重写RecordWriter实现。

0%