MapReduce 之 Mapper 分析

前面文章简单分析了 InputFormat,它其实是在 Mapper 中被调用来读取输入数据的,Mapper 类的定义相当简单直观,直接上源码(还是来自 Hadoop 2.7.3):

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { }

protected void setup(Context context) throws IOException, InterruptedException { }

protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}

protected void cleanup(Context context) throws IOException, InterruptedException { }

public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

}

注意,Mapper 是个具体的类,它的 map 方法是有默认实现的,即把 key/value 原封不动地写出去,如果 Job 没有使用 setMapperClass() 方法设置自定义 Mapper,默认使用的就是上面定义的 Mapper。Mapper 的 run() 方法定义了其框架:调用一次 setup() 方法做初始化,在每个 key/value 上都调用一次 map() 方法,最后调用 cleanup() 方法做清理。一般我们写 Job 时只会 override setup()map()cleanup() 方法,如果有需要也是可以 override run() 方法的。
下面说一下,Mapper 是怎么使用 InputFormat 的,源码中显示 Mapper 通过 Context 类的一系列方法获取 key/value,这些方法名也出现在之前分析的 RecordReader 类中,基本上可以断定 Context 的具体类中有个 RecordReader 成员。首先,我们看一下 Mapper 中定义的 Context 是一个实现了 MapContext 接口的抽象类,我们需要知道最终调用 run() 方法时传入的是什么具体类的对象,下面开始追溯这个 run() 方法是在哪里被调用的。

0%