这篇文章简要分析一下 MapReduce 的第一个核心组件 InputFormat,文中引用的源码都是来自 Hadoop 2.7.3,隐去了一些实现细节,只突出了与这次关注的内容比较相关的部分。
《Hadoop 技术内幕:深入解析 MapReduce 架构设计与实现原理》中写道,InputFormat 用于描述输入数据的格式,它提供以下两个功能:
数据切分:按某个策略将输入数据切分成若干 split,以便确定 Map Task 个数及对应的 split。
为 Mapper 提供输入数据:给定某个 split,能将其分解成一个个 key/value 对。
图 1 来自《Hadoop 权威指南(第 4 版)》第 8 章,展示了 InputFormat 类层次结构,位于继承体系最顶层的 InputFormat
是一个抽象类,从源码中看,它只有两个抽象方法,而这两个方法刚好分别实现了上面提到的两个功能:getSplits()
计算所有分片;createRecordReader()
方法构造一个 RecordReader
用于访问每个 split 中的记录,即 key/value 对。
public abstract class InputFormat<K, V> { |
而 InputSplit
也是一个抽象类,同样只有两个抽象方法:getLength()
返回其长度(字节),用于按长度排序;getLocations()
返回存储该 split 的 hostname 列表。
public abstract class InputSplit { |
注意, InputSplit
并不包含 split 实际存储的数据,只是用来标识数据的位置,抽象类提供的信息太少,我们看一个它的子类 FileSplit
,从源码中可以看到,它由文件路径,分片开始位置,分片大小和存储分片数据的 hostname 列表组成,还是支持序列化的,这是为了满足进程间通信的需求。
public class FileSplit extends InputSplit implements Writable { |
再看一下 RecordReader
,仍然是个抽象类,定义了从 split 中获取 key/value 的方法,暂且放一放,后面会分析一个它的子类 LineRecordReader
。
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { |
下面来到今天的重头戏,我们常用的 FileInputFormat
,它是 InputFormat
的一个子类,我们先来看它是如何 override 父类的 getSplits()
方法的(其实它没实现 createRecordReader
):
public List<InputSplit> getSplits(JobContext job) throws IOException { |
简单来说,通过调用 listStatus()
获取输入路径下的所有文件的信息,然后针对每个文件调用 computeSplitSize()
计算出 splitSize
,并根据这个值计算出每个 split 在该文件中的偏移量等,然后使用 makeSplit()
构造一个 FileSplit
实例放到 splits
列表中,最后返回这个列表。
这里面有几个值得关注的点,一个是 listStatus()
方法中会使用 PathFilter
过滤输入文件,而且已经定义的叫作 hiddenFileFilter
的 PathFilter
会应用到所有 Job 上,用户也可以自定义 PathFilter
,然后通过 setMaxInputSplitSize()
进行设置:
private static final PathFilter hiddenFileFilter = new PathFilter(){ |
再看 computeSplitSize()
是怎么实现的,源码中的公式已经相当明确了。其中 minSize
的计算用到 getFormatMinSplitSize()
,这是该格式文件的最小 split 大小限制(默认是 1),暂且不管。值得注意的是 minSize
的默认值是 1,maxSize
的默认值为 Long.MAX_VALUE
,而 blockSize
一般满足:minSize < blockSize < maxSize
,所以按照源码中的公式可知 splitSize
默认为 blockSize
(一般是 64MB,具体视 HDFS 配置而定,一般不会太大)。如果输入文件总量比较大,可能会产生大量 split,从而需要大量的 Mapper,占用太多计算资源,这时根据公式可以通过把 minSize
设置为一个比较大的值,以减少 split 数量,具体来说可以这么做:FileInputFormat.setMinInputSplitSize(job, 1024 * 1024 * 1024L); // 1GB
。
public List<InputSplit> getSplits(JobContext job) throws IOException { |
再看 isSplitable()
,表示输入文件是否可以分割成多个 split,这个要视具体的文件格式而定,例如 gzip 这种格式压缩的文件就是不可分割的,也可以通过返回 false 强制不分割文件,实现每个文件都由单独的一个 Mapper 处理,默认实现是返回 true 的:
protected boolean isSplitable(JobContext context, Path filename) { |
还有一个有趣的点 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { ... }
,这导致最后一个 split 的大小可能超过 splitSize(超过的比例是 SPLIT_SLOP - 1),猜测可能是为了尽量减少过小的 split,StackOverflow 上这个问题里有人做过实验,可以去看看。
前面留了一个坑 LineRecordReader
,我们先看它在哪里被使用的吧,就在我们常用的 TextInputFormat
,它是 FileInputFormat
的字类,定义相对比较简单:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { |
好了,我们略过 TextInputFormat
,直接来聊 LineRecordReader
,很明显它返回的 key 就是当前记录在该 split 中的字节偏移量,value 就是 readLine 的结果(一般来说就是一行)。这里有一个有趣的问题,FileInputFormat
是简单按照 splitSize
切分 split 的,所以一行记录很可能有一部分位于前一个 split,剩余部分位于后一个 split,所以在 LineRecordReader
的 nextKeyValue()
方法中都会越过当前 split 多读一行记录。这样可能存在重复读取的问题,这在 initialize()
方法中解决了:判断如果不是第一个 split,则跳过第一行记录。
public class LineRecordReader extends RecordReader<LongWritable, Text> { |
到目前为止,我们简单分析了 InputFormat
、FileInputFormat
、TextInputFormat
等几个类及其 getSplits()
方法的实现,另外还有 RecordReader
及其字类 LineRecordReader
的实现,其中还穿插了一些有趣的点。好了,关于 InputFormat 就说这么多吧,以后有新的体会再补充,下一篇开始分析下一个组件 Mapper。