Hadoop 小文件的处理

hadoop的HDFS和MapReduce本身都是用户处理大量数据的大文件,对于小文件来说,由于namenode会在记录每个block对象,如果存在大量的小文件,会占用namenode的大量内存空间,而且HDFS存储文件是按block来存储,即使一个文件的大小不足一个block的大小,文件还是会占用一个block的存储空间,所以大量的小文件会对HDFS的存储和访问都带来不利的影响。
hadoop对于小文件的处理主要有Hadoop Archive,Sequence file和CombineFileInputFormat三种方式。

Hadoop Archive

Hadoop Archive是hadoop的归档命令,可以将hdfs上的小文件打包成一个har文件,这种方式虽然不会减少小文件占用大量存储空间的问题,但是会减少namenode的内存空间。同时har文件支持hdfs命令对其的访问。

命令:hadoop archive -archiveName 归档名称 -p 父目录 [-r <复制因子>] 原路径(可以多个) 目的路径

-archiveNames设置归档生成文件的名字

-p 需要进行归档的文件的父目录

例子:

1
2
3
4
5
6
7
8
9
$ hadoop fs -ls /user/test/yhj/input/
Found 3 items
-rw-r--r-- 3 root hdfs 760 2018-07-04 11:48 /user/test/yhj/input/word1.txt
-rw-r--r-- 3 root hdfs 82 2018-07-04 11:48 /user/test/yhj/input/word2.txt
-rw-r--r-- 3 root hdfs 1738 2018-07-04 11:48 /user/test/yhj/input/word3.txt
$ hadoop archive -archiveName word.har -p /user/test/yhj/input/ word1.txt word2.txt word3.txt /user/test/yhj/harInput/
$ hadoop fs -ls /user/test/yhj/harInput/
Found 1 items
drwxr-xr-x - hdfs hdfs 0 2018-07-05 20:18 /user/test/yhj/harInput/word.har

HAR文件的生成是通过运行一个mapreduce的程序生成,所以需要集群环境中装个mapreduce

HAR是在Hadoop file system之上的一个文件系统,因此所有fs shell命令对HAR文件均可用,但使用不同的URI。另外,请注意档案是不可变的。所以,重命名,删除并创建返回一个错误,例如:

1
2
3
4
5
6
7
8
9
10
11
$ hadoop fs -ls /user/test/yhj/harInput/word.har
Found 4 items
-rw-r--r-- 3 hdfs hdfs 0 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_SUCCESS
-rw-r--r-- 5 hdfs hdfs 255 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_index
-rw-r--r-- 5 hdfs hdfs 22 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_masterindex
-rw-r--r-- 3 hdfs hdfs 2580 2018-07-05 20:18 /user/test/yhj/harInput/word.har/part-0
$ hadoop fs -ls har:/user/test/yhj/harInput/word.har
Found 3 items
-rw-r--r-- 3 hdfs hdfs 760 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word1.txt
-rw-r--r-- 3 hdfs hdfs 82 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word2.txt
-rw-r--r-- 3 hdfs hdfs 1738 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word3.txt

可以看到Hadoop存档目录包含元数据(采用_index和_masterindex形式)、数据部分data(part- *)文件、归档文件的名称和部分文件中的位置(_index文件)。

HAR文件也可以被mapreduce读取,路径的URI可以使用不同的URI,比如例子中的文件输入的路径URI可以下面两种方式使用

1
2
hdfs://10.1.13.111:8020/user/test/yhj/harInput/word.har
har://hdfs-10.1.13.111:8020/user/test/yhj/harInput/word.har

但是这个例子的文件来说,两个输入路径产生map的个数是不同的,har的路径产生的map有三个,对应三个word*.txt,而hdfs的路径只有一个,对应word.har/part-0

如果是文件支持行记录切分使用mapreduce来处理数据(文件的前后数据不相互影响),建议使用hdfs的URI路径,因为存档目录的part-*可能包括多个小文件的数据,这样可以减少map的个数,不会为每个单独的小文件启动一个map。

CombineFileInputFormat

将大量小文件做为mapreduce的输入是不合适的,因为FileInputFormat只会分割大文件(文件大小超过设定的分片大小,默认为HDFS的块大小),对于小于分片大小的文件,每个文件作为一个分片,如果文件大小小于一个块的大小,mapreduce会为每个小文件产生一个map,这样会产生大量小文件,而每个map只会处理少量数据,每次map操作都会产生开销。当然可以通过mapred.min.split.size和mapred.max.split.size来控制map数量。

CombineFileInputFormat是mapreduce针对小文件而设计的,CombineFileInputFormat可以将多个小文件打包进一个分片,另外,比直接设置map数量好的在于,CombineFileInputFormat在决定将那些块放入一个分片是会考虑到块所在的节点和机架的位置,避免操作分片是过多的数据传输。

CombineFileInputFormat是一个抽象类,hadoop自带的实现的有CombineTextInputFormat,我们可以通过继承CombineFileInputFormat实现createRecordReader方法,自定义RecordReader类来实现理海量小文件的MapReduce。

InputFormat主要有两个方法,getSplits(计算得到分片),createRecordReader(产生返回RecordReader,RecordReader生成输出map读入的键值对)

CombineFileInputFormat中已经实现了getSplits,即将多个小文件打包进一个分片中CombineFileSplit,我们需要实现createRecordReader方法,返回一个可以读取该分片中内容的RecordReader。

MyCombineInputFormat的实现

1
2
3
4
5
6
7
8
9
10
11
12
public class MyCombineInputFormat extends CombineFileInputFormat<LongWritable, Text>{
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
RecordReader<LongWritable, Text> reader = new CombineFileRecordReader<>((CombineFileSplit) inputSplit, taskAttemptContext, MyCombineFileRecordReader.class);
try {
reader.initialize(inputSplit, taskAttemptContext);
} catch (InterruptedException e) {
e.printStackTrace();
}
return reader;
}
}

这里实际返回了一个CombineFileRecordReader的对象,CombineFileRecordReader通过CombineFileSplit,context和Class<? extends RecordReader>类型构造,MyCombineFileRecordReader是我们对于CombineFileSplit中每一个文件的产生map的输入的方法。
CombineFileRecordReader中的nextKeyValue方法,会为每一个打包在CombineFileSplit中的文件构造一个RecordReader方法,读取文件中的记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
...
public CombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Class<? extends RecordReader<K, V>> rrClass) throws IOException {
this.split = split;
this.context = context;
this.idx = 0;
this.curReader = null;
this.progress = 0L;

try {
this.rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
this.rrConstructor.setAccessible(true);
} catch (Exception var5) {
throw new RuntimeException(rrClass.getName() + " does not have valid constructor", var5);
}

this.initNextRecordReader();
}


protected boolean initNextRecordReader() throws IOException {
if(this.curReader != null) {
this.curReader.close();
this.curReader = null;
if(this.idx > 0) {
this.progress += this.split.getLength(this.idx - 1);
}
}

if(this.idx == this.split.getNumPaths()) {
return false;
} else {
this.context.progress();

try {
Configuration conf = this.context.getConfiguration();
conf.set("mapreduce.map.input.file", this.split.getPath(this.idx).toString());
conf.setLong("mapreduce.map.input.start", this.split.getOffset(this.idx));
conf.setLong("mapreduce.map.input.length", this.split.getLength(this.idx));
this.curReader = (RecordReader)this.rrConstructor.newInstance(new Object[]{this.split, this.context, Integer.valueOf(this.idx)});
if(this.idx > 0) {
this.curReader.initialize(this.split, this.context);
}
} catch (Exception var2) {
throw new RuntimeException(var2);
}

++this.idx;
return true;
}

public boolean nextKeyValue() throws IOException, InterruptedException {
do {
if(this.curReader != null && this.curReader.nextKeyValue()) {
return true;
}
} while(this.initNextRecordReader());

return false;
}

public K getCurrentKey() throws IOException, InterruptedException {
return this.curReader.getCurrentKey();
}

public V getCurrentValue() throws IOException, InterruptedException {
return this.curReader.getCurrentValue();
}
...
}

在nextKeyValue方法中通过自定义的RecordReader的nextKeyValue读取当前文件的对象,当读完当前文件中的信息,后会通过initNextRecordReader返回初始化的下一个文件的RecordReader,所以我们只需实现相应的读取一个文件的RecordReader即可。

MyCombineFileRecordReader的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MyCombineFileRecordReader extends RecordReader<LongWritable, Text> {

private CombineFileSplit combineFileSplit;
private int currentIndex;
private LineRecordReader reader = new LineRecordReader();
private int totalNum;

public MyCombineFileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index){
super();
this.combineFileSplit = combineFileSplit;
this.currentIndex = index;
this.totalNum = combineFileSplit.getNumPaths();
}

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex),
combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
context.getConfiguration().set("mapreduce.map.input.file.name", fileSplit.getPath().getName());
this.reader.initialize(fileSplit, context);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(currentIndex >= 0 && currentIndex < totalNum){
return reader.nextKeyValue();
}else {
return false;
}
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}

@Override
public float getProgress() throws IOException, InterruptedException {
if(currentIndex >= 0 && currentIndex < totalNum){
return (float)currentIndex/totalNum;
}
return 0;
}

@Override
public void close() throws IOException {
reader.close();
}
}

MyCombineFileRecordReader中通过LineRecordReader按行来读取文本记录,在initialize方法中通过CombineFileSplit和index(CombineFileSplit中文件信息的索引位置)来得到相应文件的信息,创建对应的FileSplit,接着创建LineRecordReader对象,在nextKeyValue中委托给LineRecordReader为mapper产生键-值对象。

最后入口函数和map类的实现,将InputFormatClass替换成自定义的MyCombineInputFormat类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CombineInputFromatMain extends Configured implements Tool{

public static class CombineInputFormatMap extends Mapper<Object, Text, Text, Text>{
private Text outKey = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
outKey.set(context.getConfiguration().get("mapreduce.map.input.file.name"));
context.write(outKey, value);
}
}

@Override
public int run(String[] args) throws Exception {
//设定默认job和设置输入输出路径的函数
Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
job.setJobName("CombineInputFormat Text");
job.setJarByClass(CombineInputFromatMain.class);
job.setMapperClass(CombineInputFormatMap.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(MyCombineInputFormat.class);
return job.waitForCompletion(true) ? 0:1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new CombineInputFromatMain(), args));
}
}

在这例子中将三个word*.txt文件打包进一个分片,实际只产生了一个map。

Sequence file

sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。

顺序文件由文件头和随后的记录内容组成,顺序文件的前三个字节为SEQ(顺序文件代码),紧接着一个字节表示文件的版本号,文件头还包括键和值的类型,数据是否压缩的标志位,是否进行快压缩的标志位, 数据的压缩形式,用户自定义的数据以及同步标识。顺序文件读取内容只能从同步标识开始读取。同步标识位于记录和记录之间,也就是说无法从记录中间开始读取顺序文件的内容。

Sequence file的格式主要有三种,分为未压缩,记录压缩和块压缩。主要格式的存储方式可以查看官方给出的api: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

将小文件合并成一个sequence file的实现(代码参考hadoop 权威指南)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public class SmallFilesToSequenceFileConverter extends Configured implements Tool {

public static class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{

/**
* 不切分文件,一个split读入整个文件
* @param context
* @param filename
* @return
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
RecordReader reader = new WholeFileRecordReader();
reader.initialize(inputSplit, taskAttemptContext);
return reader;
}
}

/**
* 自定义RecordReader,读取整个小文件内容
*/
public static class WholeFileRecordReader extends RecordReader<LongWritable, Text>{

private FileSplit fileSplit;
private Configuration conf;
private LongWritable key = new LongWritable();
private Text value = new Text();
private boolean process = false;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit)inputSplit;
this.conf = taskAttemptContext.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!process){
FileSystem fs = fileSplit.getPath().getFileSystem(conf);
FSDataInputStream in = null;
try {
in = new FSDataInputStream(fs.open(fileSplit.getPath()));
byte[] contextByte = new byte[(int)fileSplit.getLength()];
IOUtils.readFully(in, contextByte, 0, contextByte.length);
//等同于 in.read(contextByte, 0, contextByte.length);
String context = new String(contextByte, "utf-8");
key.set(fileSplit.getStart());
value.set(context);
}finally {
IOUtils.closeStream(in);
}
process = true;
return true;
}
return false;
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return process? 1.0f:1.0f;
}

@Override
public void close() throws IOException {

}
}


public static class SmallFilesToSequenceFileMap extends Mapper<Object, Text, Text, Text>{

private Text outKey = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
outKey.set(((FileSplit)context.getInputSplit()).getPath().toString());
}

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
context.write(outKey, value);
}
}


@Override
public int run(String[] args) throws Exception {
//设定默认job和设置输入输出路径的函数
Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
job.setJobName("SmallFiles To SequenceFile");
job.setMapperClass(SmallFilesToSequenceFileMap.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true)? 0:1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SmallFilesToSequenceFileConverter(), args));
}
}

hdfs可以通过命令行hadoop fs -text来显示以文本的方式显示顺序文件

读取SequenceFile简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SequenceFileReadMain extends Configured implements Tool{

public static class SequenceFileReadMap extends Mapper<Text, Text, Text, Text>{
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
outKey.set("key : " + key.toString());
outValue.set("value : " + value.toString());
context.write(outKey, outValue);
}
}

@Override
public int run(String[] args) throws Exception {
Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
job.setJobName("Sequence File Read");
job.setMapperClass(SequenceFileReadMap.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception{
System.exit(ToolRunner.run(new SequenceFileReadMain(), args));
}
}

这时候读取SequenceFile的时候,key对应的是小文件的名字,value是一个小文件的所有内容,所以需要在map编写处理整个小文件内容的代码

参考资料:

https://blog.csdn.net/u011007180/article/details/52333387

https://www.cnblogs.com/staryea/p/8603112.html

http://dongxicheng.org/mapreduce/hdfs-small-files-solution/

0%