MapReduce 之 InputFormat 分析

这篇文章简要分析一下 MapReduce 的第一个核心组件 InputFormat,文中引用的源码都是来自 Hadoop 2.7.3,隐去了一些实现细节,只突出了与这次关注的内容比较相关的部分。

《Hadoop 技术内幕:深入解析 MapReduce 架构设计与实现原理》中写道,InputFormat 用于描述输入数据的格式,它提供以下两个功能:

数据切分:按某个策略将输入数据切分成若干 split,以便确定 Map Task 个数及对应的 split。

为 Mapper 提供输入数据:给定某个 split,能将其分解成一个个 key/value 对。

图 1:InputFormat 类层级结构

图 1 来自《Hadoop 权威指南(第 4 版)》第 8 章,展示了 InputFormat 类层次结构,位于继承体系最顶层的 InputFormat 是一个抽象类,从源码中看,它只有两个抽象方法,而这两个方法刚好分别实现了上面提到的两个功能:getSplits() 计算所有分片;createRecordReader() 方法构造一个 RecordReader 用于访问每个 split 中的记录,即 key/value 对。

public abstract class InputFormat<K, V> {   
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

InputSplit 也是一个抽象类,同样只有两个抽象方法:getLength() 返回其长度(字节),用于按长度排序;getLocations() 返回存储该 split 的 hostname 列表。

public abstract class InputSplit {  
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException, InterruptedException;
}

注意, InputSplit 并不包含 split 实际存储的数据,只是用来标识数据的位置,抽象类提供的信息太少,我们看一个它的子类 FileSplit,从源码中可以看到,它由文件路径,分片开始位置,分片大小和存储分片数据的 hostname 列表组成,还是支持序列化的,这是为了满足进程间通信的需求。

public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
...
@Override
public void write(DataOutput out) throws IOException { ... }

@Override
public void readFields(DataInput in) throws IOException { ... }
...
}

再看一下 RecordReader,仍然是个抽象类,定义了从 split 中获取 key/value 的方法,暂且放一放,后面会分析一个它的子类 LineRecordReader

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
}

下面来到今天的重头戏,我们常用的 FileInputFormat ,它是 InputFormat 的一个子类,我们先来看它是如何 override 父类的 getSplits() 方法的(其实它没实现 createRecordReader ):

public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}

简单来说,通过调用 listStatus() 获取输入路径下的所有文件的信息,然后针对每个文件调用 computeSplitSize() 计算出 splitSize,并根据这个值计算出每个 split 在该文件中的偏移量等,然后使用 makeSplit() 构造一个 FileSplit 实例放到 splits 列表中,最后返回这个列表。

这里面有几个值得关注的点,一个是 listStatus() 方法中会使用 PathFilter 过滤输入文件,而且已经定义的叫作 hiddenFileFilterPathFilter 会应用到所有 Job 上,用户也可以自定义 PathFilter,然后通过 setMaxInputSplitSize() 进行设置:

private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};

protected List<FileStatus> listStatus(JobContext job) throws IOException {
...
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
...
}

再看 computeSplitSize() 是怎么实现的,源码中的公式已经相当明确了。其中 minSize 的计算用到 getFormatMinSplitSize(),这是该格式文件的最小 split 大小限制(默认是 1),暂且不管。值得注意的是 minSize 的默认值是 1,maxSize 的默认值为 Long.MAX_VALUE,而 blockSize 一般满足:minSize < blockSize < maxSize,所以按照源码中的公式可知 splitSize 默认为 blockSize(一般是 64MB,具体视 HDFS 配置而定,一般不会太大)。如果输入文件总量比较大,可能会产生大量 split,从而需要大量的 Mapper,占用太多计算资源,这时根据公式可以通过把 minSize 设置为一个比较大的值,以减少 split 数量,具体来说可以这么做:FileInputFormat.setMinInputSplitSize(job, 1024 * 1024 * 1024L); // 1GB

public List<InputSplit> getSplits(JobContext job) throws IOException {
...
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
...
}

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

再看 isSplitable(),表示输入文件是否可以分割成多个 split,这个要视具体的文件格式而定,例如 gzip 这种格式压缩的文件就是不可分割的,也可以通过返回 false 强制不分割文件,实现每个文件都由单独的一个 Mapper 处理,默认实现是返回 true 的:

protected boolean isSplitable(JobContext context, Path filename) {
return true;
}

还有一个有趣的点 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { ... },这导致最后一个 split 的大小可能超过 splitSize(超过的比例是 SPLIT_SLOP - 1),猜测可能是为了尽量减少过小的 split,StackOverflow 上这个问题里有人做过实验,可以去看看。

前面留了一个坑 LineRecordReader,我们先看它在哪里被使用的吧,就在我们常用的 TextInputFormat,它是 FileInputFormat 的字类,定义相对比较简单:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}

}

好了,我们略过 TextInputFormat,直接来聊 LineRecordReader,很明显它返回的 key 就是当前记录在该 split 中的字节偏移量,value 就是 readLine 的结果(一般来说就是一行)。这里有一个有趣的问题,FileInputFormat 是简单按照 splitSize 切分 split 的,所以一行记录很可能有一部分位于前一个 split,剩余部分位于后一个 split,所以在 LineRecordReadernextKeyValue() 方法中都会越过当前 split 多读一行记录。这样可能存在重复读取的问题,这在 initialize() 方法中解决了:判断如果不是第一个 split,则跳过第一行记录。

public class LineRecordReader extends RecordReader<LongWritable, Text> {
private long start;
private long pos;
private long end;
private int maxLineLength;
private LongWritable key;
private Text value;
...
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
...
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}

public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}

if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}

// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}

@Override
public LongWritable getCurrentKey() {
return key;
}

@Override
public Text getCurrentValue() {
return value;
}
...
}

到目前为止,我们简单分析了 InputFormatFileInputFormatTextInputFormat 等几个类及其 getSplits() 方法的实现,另外还有 RecordReader 及其字类 LineRecordReader 的实现,其中还穿插了一些有趣的点。好了,关于 InputFormat 就说这么多吧,以后有新的体会再补充,下一篇开始分析下一个组件 Mapper。

0%