上一个博文中,我们分别介绍了使用combiner 和in-map 聚合 来优化 map reduce job。
这一篇中,继续优化这个统计字符数量的mr 程序。
再次介绍下map reduce程序的目标: 统计输入文件中 a~z 字符分别有多少个。 A 与 a 都统计到 a 中。
先看一下上次运行的Job Counters 计数,可以看到Reducer Task的个数为1.
- Job Counters
- Launched map tasks=3
- Launched reduce tasks=1
- Data-local map tasks=3
- Total time spent by all maps in occupied slots (ms)=1207744
- Total time spent by all reduces in occupied slots (ms)=341424
-
知识点:MR Job 默认的reducer 个数为1. reducer的个数可以通过job.setNumReducerTasks(num) 来进行设置。 留个问题:那map 任务的个数怎么设置? 假如数据量很大时, 一个reducer task 可能成为任务的bottle neck。 那我们手工设置一下reducer 个数。
- @Override
- public int run(String[] args) throws Exception {
- //valid the parameters
- if(args.length !=2){
- return -1;
- }
-
- Job job = Job.getInstance(getConf(), "MyWordCountJob");
- job.setJarByClass(MyWordCountJob.class);
-
- Path inPath = new Path(args[0]);
- Path outPath = new Path(args[1]);
-
- outPath.getFileSystem(getConf()).delete(outPath,true);
- TextInputFormat.setInputPaths(job, inPath);
- TextOutputFormat.setOutputPath(job, outPath);
-
-
- job.setMapperClass(MyWordCountJob.MyWordCountMapper.class);
- job.setReducerClass(MyWordCountJob.MyWordCountReducer.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setNumReduceTasks(3);
- return job.waitForCompletion(true)?0:1;
- }
编译后再次运行查看日志 可以看到 reducer的个数是3,符合我们的预期。但问题还没有完!
- File System Counters
- FILE: Number of bytes read=642
- FILE: Number of bytes written=507033
- FILE: Number of read operations=0
- FILE: Number of large read operations=0
- FILE: Number of write operations=0
- HDFS: Number of bytes read=556
- HDFS: Number of bytes written=107
- HDFS: Number of read operations=18
- HDFS: Number of large read operations=0
- HDFS: Number of write operations=6
- Job Counters
- Launched map tasks=3
- Launched reduce tasks=3
- Data-local map tasks=3
- Total time spent by all maps in occupied slots (ms)=1207744
- Total time spent by all reduces in occupied slots (ms)=341424
- Map-Reduce Framework
- Map input records=8
- Map output records=78
- Map output bytes=468
- Map output materialized bytes=678
- Input split bytes=399
- Combine input records=0
- Combine output records=0
- Reduce input groups=26
- Reduce shuffle bytes=678
- Reduce input records=78
- Reduce output records=26
- Spilled Records=156
- Shuffled Maps =9
- Failed Shuffles=0
- Merged Map outputs=9
- GC time elapsed (ms)=507
- CPU time spent (ms)=7770
- Physical memory (bytes) snapshot=1329672192
- Virtual memory (bytes) snapshot=5978918912
- Total committed heap usage (bytes)=1004273664
下面看一下输出,有三个输出文件,正常,一个reducer 对应一个输出文件。
- [train@sandbox MyWordCount]$ hdfs dfs -ls output
- Found 4 items
- -rw-r--r-- 3 train hdfs 0 2016-05-11 11:48 output/_SUCCESS
- -rw-r--r-- 3 train hdfs 37 2016-05-11 11:48 output/part-r-00000
- -rw-r--r-- 3 train hdfs 34 2016-05-11 11:48 output/part-r-00001
- -rw-r--r-- 3 train hdfs 36 2016-05-11 11:48 output/part-r-00002
我们看一下结果文件中的内容
- [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00000
- b 3
- e 11
- h 8
- k 3
- n 4
- q 3
- t 4
- w 7
- z 3
- [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00001
- c 4
- f 4
- i 5
- l 6
- o 12
- r 13
- u 6
- x 3
- [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00002
- a 8
- d 4
- g 3
- j 3
- m 7
- p 3
- s 4
- v 0
- y 6
发现三个文件中没有按顺序输出,现在我们想要的是a~h 放到一个文件中,i~q 放到第二个文件中、r~z 放到第三个文件中。我们应该怎么办?
这就要用到我们本次将要介绍的利器: Partitioner 。
Partition 就是用来决定map的输出会输送到哪儿个reducer中。当只有一个reducer 时,不会调用Partitioner,即使配置了也不会调用。
hadoop 框架中默认的Partitioner 是HashPartitioner。 它是根据key的hash值对reducer个数取余进行分配的。
说明一下:返回0即本记录将发往第一个reducer,返回1则本记录发往第二个reducer.依次类推。
- public class HashPartitioner<K, V> extends Partitioner<K, V> {
-
- public int getPartition(K key, V value,
- int numReduceTasks) {
- return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
- }
那接下来我们要定义自己的一个Partitioner ,它要按照我们预定的
a~h 发往第一个reducer,i~q 发往第二个reducer、r~z发往第三个reducer。 - public static class MyWordCountPartitioner extends Partitioner<Text,IntWritable>{
-
- @Override
- public int getPartition(Text key, IntWritable value, int numPartitions) {
- char c =key.toString().charAt(0);
- if(c>='a'&& c<'i')
- return 0;
- else if(c>='i' && c<'q')
- return 1;
- else
- return 2;
- }
-
- }
运行查看结果,可见实现了我们的预期。
- [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00000
- a 8
- b 3
- c 4
- d 4
- e 11
- f 4
- g 3
- h 8
- [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00001
- i 5
- j 3
- k 3
- l 6
- m 7
- n 4
- o 12
- p 3
- [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00002
- q 3
- r 13
- s 4
- t 4
- u 6
- v 0
- w 7
- x 3
- y 6
- z 3
下面照例把整个代码贴上
- package wordcount;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.log4j.Logger;
-
- public class MyWordCountJob extends Configured implements Tool {
- Logger log = Logger.getLogger(MyWordCountJob.class);
-
- public static class MyWordCountMapper extends
- Mapper<LongWritable, Text, Text, IntWritable> {
- Logger log = Logger.getLogger(MyWordCountJob.class);
- Map<Character,Integer> map = new HashMap<Character,Integer>();
-
- Text mapKey = new Text();
- IntWritable mapValue = new IntWritable(1);
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- for(char c :value.toString().toLowerCase().toCharArray()){
- if(c>='a' && c <='z'){
- map.put(c,map.get(c)+1);
- }
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException {
- for(char key : map.keySet()){
- mapKey.set(String.valueOf(key));
- mapValue.set(map.get(key));
- context.write(mapKey, mapValue);
- }
-
- }
-
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- for(char c='a';c<='z' ;c++){
- map.put(c, 0);
- }
- }
-
- }
-
- public class MyWordCountPartitioner extends Partitioner<Text,IntWritable>{
-
- @Override
- public int getPartition(Text key, IntWritable value, int numPartitions) {
- char c =key.toString().charAt(0);
- if(c>='a'&& c<'i')
- return 0;
- else if(c>='i' && c<'q')
- return 1;
- else
- return 2;
- }
-
- }
-
- public static class MyWordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- Text rkey = new Text();
- IntWritable rvalue = new IntWritable(1);
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,Context context)
- throws IOException, InterruptedException {
- int n=0;
- for(IntWritable value :values){
- n+= value.get();
- }
- rvalue.set(n);
- context.write(key, rvalue);
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- //valid the parameters
- if(args.length !=2){
- return -1;
- }
-
- Job job = Job.getInstance(getConf(), "MyWordCountJob");
- job.setJarByClass(MyWordCountJob.class);
-
- Path inPath = new Path(args[0]);
- Path outPath = new Path(args[1]);
-
- outPath.getFileSystem(getConf()).delete(outPath,true);
- TextInputFormat.setInputPaths(job, inPath);
- TextOutputFormat.setOutputPath(job, outPath);
-
-
- job.setMapperClass(MyWordCountJob.MyWordCountMapper.class);
- job.setReducerClass(MyWordCountJob.MyWordCountReducer.class);
- job.setPartitionerClass(MyWordCountPartitioner.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setNumReduceTasks(3);
- return job.waitForCompletion(true)?0:1;
- }
- public static void main(String [] args){
- int result = 0;
- try {
- result = ToolRunner.run(new Configuration(), new MyWordCountJob(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.exit(result);
- }
-
- }
最后再唠叨两句: 一般情况要,自定义partitioner 是为了解决数据分布不均的情况,又叫数据倾斜。 而且自定义的partitioner 要保证,相同的key要发往相同的reducer。