转载

一个MapReduce 程序示例 细节决定成败(五) :Partitioner

上一个博文中,我们分别介绍了使用combiner 和in-map 聚合 来优化 map reduce job。
这一篇中,继续优化这个统计字符数量的mr 程序。
再次介绍下map reduce程序的目标: 统计输入文件中 a~z 字符分别有多少个。 A 与 a 都统计到 a 中。 
先看一下上次运行的Job Counters 计数,可以看到Reducer Task的个数为1.

点击(此处)折叠或打开

  1.         Job Counters 
  2.                 Launched map tasks=3
  3.                 Launched reduce tasks=1
  4.                 Data-local map tasks=3
  5.                 Total time spent by all maps in occupied slots (ms)=1207744
  6.                 Total time spent by all reduces in occupied slots (ms)=341424
  7.     
知识点:MR Job 默认的reducer 个数为1.  reducer的个数可以通过job.setNumReducerTasks(num) 来进行设置。
留个问题:那map 任务的个数怎么设置?

假如数据量很大时, 一个reducer task 可能成为任务的bottle neck。 那我们手工设置一下reducer 个数。

点击(此处)折叠或打开

  1. @Override
  2.         public int run(String[] args) throws Exception {
  3.                 //valid the parameters
  4.                 if(args.length !=2){
  5.                         return -1;
  6.                 }

  7.                 Job job = Job.getInstance(getConf(), "MyWordCountJob");
  8.                 job.setJarByClass(MyWordCountJob.class);

  9.                 Path inPath = new Path(args[0]);
  10.                 Path outPath = new Path(args[1]);

  11.                 outPath.getFileSystem(getConf()).delete(outPath,true);
  12.                 TextInputFormat.setInputPaths(job, inPath);
  13.                 TextOutputFormat.setOutputPath(job, outPath);


  14.                 job.setMapperClass(MyWordCountJob.MyWordCountMapper.class);
  15.                 job.setReducerClass(MyWordCountJob.MyWordCountReducer.class);
  16.                 job.setInputFormatClass(TextInputFormat.class);
  17.                 job.setOutputFormatClass(TextOutputFormat.class);

  18.                 job.setMapOutputKeyClass(Text.class);
  19.                 job.setMapOutputValueClass(IntWritable.class);
  20.                 job.setOutputKeyClass(Text.class);
  21.                 job.setOutputValueClass(IntWritable.class);

  22.                 job.setNumReduceTasks(3);
  23.                 return job.waitForCompletion(true)?0:1;
  24.         }
编译后再次运行查看日志 可以看到 reducer的个数是3,符合我们的预期。但问题还没有完!

点击(此处)折叠或打开

  1. File System Counters
  2.                 FILE: Number of bytes read=642
  3.                 FILE: Number of bytes written=507033
  4.                 FILE: Number of read operations=0
  5.                 FILE: Number of large read operations=0
  6.                 FILE: Number of write operations=0
  7.                 HDFS: Number of bytes read=556
  8.                 HDFS: Number of bytes written=107
  9.                 HDFS: Number of read operations=18
  10.                 HDFS: Number of large read operations=0
  11.                 HDFS: Number of write operations=6
  12.         Job Counters
  13.                 Launched map tasks=3
  14.                 Launched reduce tasks=3
  15.                 Data-local map tasks=3
  16.                 Total time spent by all maps in occupied slots (ms)=1207744
  17.                 Total time spent by all reduces in occupied slots (ms)=341424
  18.         Map-Reduce Framework
  19.                 Map input records=8
  20.                 Map output records=78
  21.                 Map output bytes=468
  22.                 Map output materialized bytes=678
  23.                 Input split bytes=399
  24.                 Combine input records=0
  25.                 Combine output records=0
  26.                 Reduce input groups=26
  27.                 Reduce shuffle bytes=678
  28.                 Reduce input records=78
  29.                 Reduce output records=26
  30.                 Spilled Records=156
  31.                 Shuffled Maps =9
  32.                 Failed Shuffles=0
  33.                 Merged Map outputs=9
  34.                 GC time elapsed (ms)=507
  35.                 CPU time spent (ms)=7770
  36.                 Physical memory (bytes) snapshot=1329672192
  37.                 Virtual memory (bytes) snapshot=5978918912
  38.                 Total committed heap usage (bytes)=1004273664

下面看一下输出,有三个输出文件,正常,一个reducer 对应一个输出文件。

点击(此处)折叠或打开

  1. [train@sandbox MyWordCount]$ hdfs dfs -ls output
  2. Found 4 items
  3. -rw-r--r-- 3 train hdfs 0 2016-05-11 11:48 output/_SUCCESS
  4. -rw-r--r-- 3 train hdfs 37 2016-05-11 11:48 output/part-r-00000
  5. -rw-r--r-- 3 train hdfs 34 2016-05-11 11:48 output/part-r-00001
  6. -rw-r--r-- 3 train hdfs 36 2016-05-11 11:48 output/part-r-00002

我们看一下结果文件中的内容

点击(此处)折叠或打开

  1. [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00000
  2. b 3
  3. e 11
  4. h 8
  5. k 3
  6. n 4
  7. q 3
  8. t 4
  9. w 7
  10. z 3
  11. [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00001
  12. c 4
  13. f 4
  14. i 5
  15. l 6
  16. o 12
  17. r 13
  18. u 6
  19. x 3
  20. [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00002
  21. a 8
  22. d 4
  23. g 3
  24. j 3
  25. m 7
  26. p 3
  27. s 4
  28. v 0
  29. y 6
发现三个文件中没有按顺序输出,现在我们想要的是a~h 放到一个文件中,i~q 放到第二个文件中、r~z 放到第三个文件中。我们应该怎么办?
这就要用到我们本次将要介绍的利器: Partitioner 。
Partition 就是用来决定map的输出会输送到哪儿个reducer中。当只有一个reducer 时,不会调用Partitioner,即使配置了也不会调用。
hadoop 框架中默认的Partitioner 是HashPartitioner。 它是根据key的hash值对reducer个数取余进行分配的。
说明一下:返回0即本记录将发往第一个reducer,返回1则本记录发往第二个reducer.依次类推。

点击(此处)折叠或打开

  1. public class HashPartitioner<K, V> extends Partitioner<K, V> {
  2.  
  3.   public int getPartition(K key, V value,
  4.                           int numReduceTasks) {
  5.     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  6.   }
  7. }
那接下来我们要定义自己的一个Partitioner ,它要按照我们预定的a~h 发往第一个reducer,i~q 发往第二个reducer、r~z发往第三个reducer。
 

点击(此处)折叠或打开

  1. public static class MyWordCountPartitioner extends Partitioner<Text,IntWritable>{

  2.                 @Override
  3.                 public int getPartition(Text key, IntWritable value, int numPartitions) {
  4.                         char c =key.toString().charAt(0);
  5.                         if(c>='a'&& c<'i')
  6.                                 return 0;
  7.                         else if(c>='i' && c<'q')
  8.                                 return 1;
  9.                         else
  10.                                 return 2;
  11.                 }

  12.         }
运行查看结果,可见实现了我们的预期。

点击(此处)折叠或打开

  1. [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00000
  2. a 8
  3. b 3
  4. c 4
  5. d 4
  6. e 11
  7. f 4
  8. g 3
  9. h 8
  10. [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00001
  11. i 5
  12. j 3
  13. k 3
  14. l 6
  15. m 7
  16. n 4
  17. o 12
  18. p 3
  19. [train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00002
  20. q 3
  21. r 13
  22. s 4
  23. t 4
  24. u 6
  25. v 0
  26. w 7
  27. x 3
  28. y 6
  29. z 3
下面照例把整个代码贴上

点击(此处)折叠或打开

  1. package wordcount;

  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.Map;

  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.conf.Configured;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Partitioner;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.apache.hadoop.util.Tool;
  18. import org.apache.hadoop.util.ToolRunner;
  19. import org.apache.log4j.Logger;

  20. public class MyWordCountJob extends Configured implements Tool {
  21.         Logger log = Logger.getLogger(MyWordCountJob.class);

  22.         public static class MyWordCountMapper extends
  23.                         Mapper<LongWritable, Text, Text, IntWritable> {
  24.                 Logger log = Logger.getLogger(MyWordCountJob.class);
  25.                 Map<Character,Integer> map = new HashMap<Character,Integer>();

  26.                 Text mapKey = new Text();
  27.                 IntWritable mapValue = new IntWritable(1);
  28.                 @Override
  29.                 protected void map(LongWritable key, Text value, Context context)
  30.                                 throws IOException, InterruptedException {
  31.                         for(char c :value.toString().toLowerCase().toCharArray()){
  32.                                 if(c>='a' && c <='z'){
  33.                                         map.put(c,map.get(c)+1);
  34.                                 }
  35.                         }
  36.                 }

  37.                 @Override
  38.                 protected void cleanup(Context context) throws IOException,
  39.                                 InterruptedException {
  40.                         for(char key : map.keySet()){
  41.                                 mapKey.set(String.valueOf(key));
  42.                                 mapValue.set(map.get(key));
  43.                                 context.write(mapKey, mapValue);
  44.                         }

  45.                 }

  46.                 @Override
  47.                 protected void setup(Context context) throws IOException,
  48.                                 InterruptedException {
  49.                         for(char c='a';c<='z' ;c++){
  50.                                 map.put(c, 0);
  51.                         }
  52.                 }

  53.         }

  54.         public class MyWordCountPartitioner extends Partitioner<Text,IntWritable>{

  55.                 @Override
  56.                 public int getPartition(Text key, IntWritable value, int numPartitions) {
  57.                         char c =key.toString().charAt(0);
  58.                         if(c>='a'&& c<'i')
  59.                                 return 0;
  60.                         else if(c>='i' && c<'q')
  61.                                 return 1;
  62.                         else
  63.                                 return 2;
  64.                 }

  65.         }

  66.         public static class MyWordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  67.                 Text rkey = new Text();
  68.                 IntWritable rvalue = new IntWritable(1);
  69.                 @Override
  70.                 protected void reduce(Text key, Iterable<IntWritable> values,Context context)
  71.                                 throws IOException, InterruptedException {
  72.                         int n=0;
  73.                         for(IntWritable value :values){
  74.                                 n+= value.get();
  75.                         }
  76.                         rvalue.set(n);
  77.                         context.write(key, rvalue);
  78.                 }
  79.         }

  80.         @Override
  81.         public int run(String[] args) throws Exception {
  82.                 //valid the parameters
  83.                 if(args.length !=2){
  84.                         return -1;
  85.                 }

  86.                 Job job = Job.getInstance(getConf(), "MyWordCountJob");
  87.                 job.setJarByClass(MyWordCountJob.class);

  88.                 Path inPath = new Path(args[0]);
  89.                 Path outPath = new Path(args[1]);

  90.                 outPath.getFileSystem(getConf()).delete(outPath,true);
  91.                 TextInputFormat.setInputPaths(job, inPath);
  92.                 TextOutputFormat.setOutputPath(job, outPath);


  93.                 job.setMapperClass(MyWordCountJob.MyWordCountMapper.class);
  94.                 job.setReducerClass(MyWordCountJob.MyWordCountReducer.class);
  95.                 job.setPartitionerClass(MyWordCountPartitioner.class);
  96.                 job.setInputFormatClass(TextInputFormat.class);
  97.                 job.setOutputFormatClass(TextOutputFormat.class);
  98.                 job.setMapOutputKeyClass(Text.class);
  99.                 job.setMapOutputValueClass(IntWritable.class);
  100.                 job.setOutputKeyClass(Text.class);
  101.                 job.setOutputValueClass(IntWritable.class);

  102.                 job.setNumReduceTasks(3);
  103.                 return job.waitForCompletion(true)?0:1;
  104.         }
  105.         public static void main(String [] args){
  106.                 int result = 0;
  107.                 try {
  108.                         result = ToolRunner.run(new Configuration(), new MyWordCountJob(), args);
  109.                 } catch (Exception e) {
  110.                         e.printStackTrace();
  111.                 }
  112.                 System.exit(result);
  113.         }

  114. }
最后再唠叨两句: 一般情况要,自定义partitioner 是为了解决数据分布不均的情况,又叫数据倾斜。 而且自定义的partitioner 要保证,相同的key要发往相同的reducer。

 
正文到此结束
Loading...