前四篇文章讲了Hadoop的配置和测试以及eclipse下的使用,有兴趣的可以先看下。
用可执行文件作为Mapper和Reducer,接受的都是标准输入,输出的都是标准输出。
当一个可执行文件作为Mapper时,每一个Map任务会以一个独立的进程启动这个可执行文件,然后在Map任务运行时,会把输入切分成行提供给可 执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行出结果时,Map从标准输出(stdout)中收集数据,并将其转化 为<key, value>对,作为Map的输出。
Reduce与Map相同,如果可执行文件做Reducer时,Reduce任务会启动这个可执行文件,并且将<key, value>对转化为行作为这个可执行文件的标准输入(stdin)。然后Reduce会收集这个可执行文件的标准输出(stdout)的内容。并 把每一行转化为<key, value>对,作为Reduce的输出。
Map与Reduce将输出转化为<key , value>对的默认方法是:将每行的第一个tab符号(制表符)之前的内容作为key,之后的内容作为value。如果没有tab符号,那么这一 行的所有内容会作为key,而value值为null。当然这是可以更改的。
值得一提的是,可以使用Java类作为Map,而用一个可执行程序作为Reduce;或使用Java类作为Reduce,而用可执行程序作为Map。
下面先看一个简单例子,用/bin/cat作Mapper,用/usr/bin/wc作Reducer
/input下两个文件为:
hello world bye world
hello hadoop bye hadoop
我在root账户/usr/local/hadoop/hadoop-2.2.0/bin目录(和安装路径有关)下运行此代码,可以统计文件中的行数,单词数和字节数。
root@master:/usr/local/hadoop/hadoop-2.2.0/bin# hadoop jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /output/stream -mapper /bin/cat -reducer /usr/bin/wc
运行结果为: 2 8 46
wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。
参数 可选/必选 参数 可选/必选
-input 必选 -cmdenv 可选
-output 必选 -inputreader 可选
-mapper 必选 -verbose 可选
-reducer 必选 -lazyOutput 可选
-file 可选 -numReduce tasks 可选
-inputformat 可选 -mapdebug 可选
-outputformat 可选 -reducedebug 可选
-partitioner 可选 -io 可选
-combiner 可选
Hadoop流命令中,必选的4个很好理解,分别用于指定输入/输出文件的位置及Map/Reduce函数。在其他的可选命令中,这里我们只解释常用的几个。
-file
这个指令用于将文件加入到Hadoop的Job中。上面的例子中,cat和wc都是 Linux 系统中的命令,而在Hadoop流的使用中,往往需要使用自己写的文件(作为Map函数或Reduce函数)。一般而言,这些文件是Hadoop集群中的机器上没有的,这时就需要使用Hadoop流中的-file命令将这个可执行文件加入到Hadoop的Job中。
-combiner
这个命令用来加入combiner程序。
-inputformat和-outputformat
这两个命令用来设置输入输出文件的处理方法,这两个命令后面的参数必须是Java类。
Hadoop流的通用命令用来配置Hadoop流的Job。需要注意的是,如果使用这部分配置,就必须将其置于流命令配置之前,否则命令会失败。这里简要列出命令列表,供大家参考。
Hadoop流的Job设置命令
参数 可选/必选 参数 可选/必选
-conf 可选 -files 可选
-D 可选 -libjars 可选
-fs 可选 -archives 可选
-jt 可选
从上面的内容可以知道,Hadoop流的API是一个扩展性非常强的框架,它与程序相连的部分只有数据,因此可以接受任何适用于UNIX标准输入/输出的脚本语言,比如Bash、PHP、Ruby、Python等。下面举两个非常简单的例子来进一步说明它的特性。(来源:《Hadoop实战》-陆嘉恒,中国人民大学)
MapReduce框架是一个非常适合在大规模的非结构化数据中查找数据的 编程 模型,grep就是这种类型的一个例子。
在Linux中,grep命令用来在一个或多个文件中查找某个字符模式(这个字符模式可以代表字符串,多用正则表达式表示)。
下面尝试在如下的数据中查找带有Hadoop字符串的行,如下所示。
输入文件为:
file01:
hello world bye world
file02:
hello hadoop bye hadoop
reduce文件为:
reduce.sh:
grep hadoop
输入命令为:
root@master:/usr/local/hadoop/hadoop-2.2.0/bin# jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /output -mapper /bin/cat -reducer ~/Desktop/test/reducer.sh -file ~/Desktop/test/reducer.sh
结果为:hello hadoop bye hadoop
显然,这个结果是正确的。
3.2 Python
对于 Python 来说,情况有些特殊。因为Python是可以编译为JAR包的,如果将程序编译为JAR包,那么就可以采用运行JAR包的方式来运行了。
不过,同样也可以用流的方式运行Python程序。请看如下代码:Reduce.py
1 1 #!/usr/bin/python 2 2 3 3 import sys; 4 4 5 5 def generateLongCountToken(id): 6 6 return "LongValueSum:" + id + "/t" + "1" 7 7 def main(argv): 8 8 line = sys.stdin.readline(); 9 9 try: 10 10 while line: 11 11 line = line[:-1]; 12 12 fields = line.split("/t"); 13 13 print generateLongCountToken(fields[0]); 14 14 line = sys.stdin.readline(); 15 15 except "end of file": 16 16 return None 17 17 if __name__ == "__main__": 18 18 main(sys.argv)
使用如下命令来运行:
root@master:/usr/local/hadoop/hadoop-2.2.0/bin# jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /pyoutput -mapper reduce.py -reducer aggregate -file reduce.py
注意其中的aggregate是Hadoop提供的一个包,它提供一个Reduce函数和一个combine函数。这个函数实现一些简单的类似求和、取最大值最小值等的功能。
Hadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:
1 #include "hadoop/Pipes.hh" 2 #include "hadoop/TemplateFactory.hh" 3 #include "hadoop/StringUtils.hh" 4 5 const std::string WORDCOUNT = "WORDCOUNT"; 6 const std::string INPUT_WORDS = "INPUT_WORDS"; 7 const std::string OUTPUT_WORDS = "OUTPUT_WORDS"; 8 9 class WordCountMap: public HadoopPipes::Mapper { 10 public: 11 HadoopPipes::TaskContext::Counter* inputWords; 12 13 WordCountMap(HadoopPipes::TaskContext& context) { 14 inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS); 15 } 16 17 void map(HadoopPipes::MapContext& context) { 18 std::vector<std::string> words = 19 HadoopUtils::splitString(context.getInputValue(), " "); 20 for(unsigned int i=0; i < words.size(); ++i) { 21 context.emit(words[i], "1"); 22 } 23 context.incrementCounter(inputWords, words.size()); 24 } 25 }; 26 27 class WordCountReduce: public HadoopPipes::Reducer { 28 public: 29 HadoopPipes::TaskContext::Counter* outputWords; 30 31 WordCountReduce(HadoopPipes::TaskContext& context) { 32 outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS); 33 } 34 35 void reduce(HadoopPipes::ReduceContext& context) { 36 int sum = 0; 37 while (context.nextValue()) { 38 sum += HadoopUtils::toInt(context.getInputValue()); 39 } 40 context.emit(context.getInputKey(), HadoopUtils::toString(sum)); 41 context.incrementCounter(outputWords, 1); 42 } 43 }; 44 int main(int argc, char *argv[]) { 45 return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>()); 46 }
这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集<key, value>对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输 出<key, value>对。main函数是应用程序的入口,它调用HadoopPipes::runTask方法,这个方法由一个 TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、 record reader、record writer。
接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:
apt-get install g++
然后建立文件Makerfile,如下所示:
HADOOP_INSTALL="你的hadoop安装文件夹"
PLATFORM= Linux -i386-32(如果是AMD的CPU,请使用 Linux -amd64-64)
CC = g++CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
wordcount: wordcount.cpp$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils -lpthread -g -O2 -o $@
注意在$(CC)前有一个<tab>符号,这个分隔符是很关键的。
在当前目录下建立一个WordCount可执行文件。
接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。
~/hadoop/bin/hadoop fs –mkdir bin ~/hadoop/bin/hadoop dfs –put wordcount /bin
然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:
~/hadoop/bin/hadoop pipes/ -D hadoop.pipes.java.recordreader=true/ -D hadoop.pipes.java.recordwriter=true/ -input input/ -output Coutput/ -program /bin/wordcount
另一种方式是预先将配置写入配置文件中,如下所示:
<?xml version="1.0"?> <configuration> <property> // Set the binary path on DFS <name>hadoop.pipes.executable</name> <value>bin/wordcount</value> </property> <property> <name>hadoop.pipes.java.recordreader</name> <value>true</value> </property> <property> <name>hadoop.pipes.java.recordwriter</name> <value>true</value> </property> </configuration>
然后通过如下命令运行这个程序:
~/hadoop/bin/hadoop pipes -conf word.xml -input /input -output /output
将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用 Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函 数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
3)主方法 Main 分析
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);}
首先讲解一下Job的初始化过程。main函数调用Jobconf类来对MapReduce Job进行初始化,然后调用setJobName()方法命名这个Job。对Job进行合理的命名有助于更快地找到Job,以便在JobTracker和Tasktracker的页面中对其进行监视。
JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );
接着设置Job输出结果<key,value>的中key和value数据类型,因为结果是<单词,个数>,所以 key设置为"Text"类型,相当于Java中String类型。Value设置为"IntWritable",相当于Java中的int类型。
conf.setOutputKeyClass(Text.class );
conf.setOutputValueClass(IntWritable.class );
然后设置Job处理的Map(拆分)、Combiner(中间结果合并)以及Reduce(合并)的相关处理类。这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。
conf.setMapperClass(Map.class );
conf.setCombinerClass(Reduce.class );
conf.setReducerClass(Reduce.class );
接着就是调用setInputPath()和setOutputPath()设置输入输出路径
conf.setInputFormat(TextInputFormat.class );
conf.setOutputFormat(TextOutputFormat.class );
InputSplit是Hadoop定义的用来传送给每个单独的map的数据,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSplit的方法可以通过InputFormat()来设置。
当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()、creatValue()方法创建可供map处理的<key,value>对。简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。
Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的<key,value>对,它们都继承自InputFormat,分别是:
InputFormat
|
|---BaileyBorweinPlouffe.BbpInputFormat
|---ComposableInputFormat
|---CompositeInputFormat
|---DBInputFormat
|---DistSum.Machine.AbstractInputFormat
|---FileInputFormat
|---CombineFileInputFormat
|---KeyValueTextInputFormat
|---NLineInputFormat
|---SequenceFileInputFormat
|---TeraInputFormat
|---TextInputFormat
其中TextInputFormat是Hadoop默认的输入方法,在TextInputFormat中,每个文件(或其一部分)都会单独地作为map的输入,而这个是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示成<key,value>形式:
key值是每个数据的记录在数据分片中字节偏移量,数据类型是LongWritable;
value值是每行的内容,数据类型是Text。
(2)OutputFormat
每一种输入格式都有一种输出格式与其对应。默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过,它的键和值可以是任意形式的,因为程序内容会调用toString()方法将键和值转换为String类型再输出。
3)Map类中map方法分析
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
Map类继承自MapReduceBase,并且它实现了Mapper接口,此接口是一个规范类型,它有4种形式的参数,分别用来指定map的输入key值类型、输入value值类型、输出key值类型和输出value 值类型。在本例中,因为使用的是TextInputFormat,它的输出key值是LongWritable类型,输出value值是Text类型,所 以map的输入类型为<LongWritable,Text>。在本例中需要输出<word,1>这样的形式,因此输出的key 值类型是Text,输出的value值类型是IntWritable。
实现此接口类还需要实现map方法,map方法会具体负责对输入进行操作,在本例中,map方法对输入的行以空格为单位进行切分,然后使用OutputCollect收集输出的<word,1>。
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
Reduce类也是继承自MapReduceBase的,需要实现Reducer接口。Reduce类以map的输出作为输入,因此Reduce的输入类型是<Text,Intwritable>。而Reduce的输出是单词和它的数目,因此,它的输出类型是<Text,IntWritable>。Reduce类也要实现reduce方法,在此方法中,reduce函数将输入的key值作为输出的key值,然后将获得多个value值加起来,作为输出的值。
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main (String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1)Map过程
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其 map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回 车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并 将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。
2)Reduce过程
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
3)执行MapReduce任务
public static void main (String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用 TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine 和Reduce过程中的处理。还设置了Map 过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入 路径 则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用 job.waitForCompletion() 方法执行任务。
Hadoop最新版本的MapReduce Release 0.20.0的API包括了一个全新的Mapreduce JAVA API,有时候也称为上下文对象。
新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用 。
新的API和旧的API之间有下面几个明显的区别。
新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置, 这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来 完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。
(最后的部分参考http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html)