Hadoop中Partition解析

转载 xwdreamer   2013-11-19 16:45  

Map的结果,会通过partition分发到 Reducer 上, Reducer 做完Reduce操作后,通过 OutputFormat ,进行输出,下面我们就来分析参与这个过程的类。

Mapper 的结果,可能送到 Combiner 做合并, Combiner 在系统中并没有自己的基类,而是用 Reducer 作为 Combiner 的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。 Mapper 最终处理的键值对<key, value>,是需要送到 Reducer 去合并的,合并的时候,有相同key的键/值对会送到同一个 Reducer 那。哪个key到哪个 Reducer 的分配过程,是由 Partitioner 规定的。它只有一个方法,

1 getPartition(Text key, Text value, int numPartitions)

输入是Map的结果对<key, value>和 Reducer 的数目,输出则是分配的 Reducer (整数编号)。就是指定 Mapper 输出的键值对到哪一个 Reducer 上去。系统缺省的 PartitionerHashPartitioner ,它以key的Hash值对 Reducer 的数目取模,得到对应的 Reducer 。这样保证如果有相同的key值,肯定被分配到同一个 Reducer 上。如果有N个 Reducer ,编号就为0,1,2,3……(N-1)。

Reducer 是所有用户定制 Reducer 类的基类,和 Mapper 类似,它也有 setupreducecleanuprun 方法,其中 setupcleanup 含义和 Mapper 相同, reduce 是真正合并 Mapper 结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括 Reducer 的上下文。系统中定义了两个非常简单的 ReducerIntSumReducerLongSumReducer ,分别用于对整形/长整型的value求和。

Reduce的结果,通过 Reducer.Context 的方法 collect 输出到文件中,和输入类似,Hadoop引入了 OutputFormatOutputFormat 依赖两个辅助接口: RecordWriterOutputCommitter 来处理输出。 RecordWriter 提供了 write 方法,用于输出<key, value>,和 close 方法,用于关闭对应的输出。 OutputCommitter 提供了一系列方法,用户通过实现这些方法,可以定制 OutputFormat 生存期某些阶段需要的特殊操作。我们在 TaskInputOutputContext 中讨论过这些方法(明显, TaskInputOutputContextOutputFormatReducer 间的桥梁)。 OutputFormatRecordWriter 分别对应着 InputFormatRecordReader ,系统提供了空输出 NullOutputFormat (什么结果都不输出, NullOutputFormat.RecordWriter 只是示例,系统中没有定义), LazyOutputFormat (没在类图中出现,不分析), FilterOutputFormat (不分析)和基于文件 FileOutputFormatSequenceFileOutputFormatTextOutputFormat 输出。

基于文件的输出 FileOutputFormat 利用了一些配置项配合工作,包括:

  • mapred.output.compress :是否压缩;
  • mapred.output.compression.codec :压缩方法;
  • mapred.output.dir :输出路径;
  • mapred.work.output.dir :输出工作路径。

FileOutputFormat 还依赖于 FileOutputCommitter ,通过 FileOutputCommitter 提供一些和 JobTask 相关的临时文件管理功能。如 FileOutputCommittersetupJob ,会在输出路径下创建一个名为_temporary的临时目录, cleanupJob 则会删除这个目录。

SequenceFileOutputFormat 输出和 TextOutputFormat 输出分别对应输入的 SequenceFileInputFormatTextInputFormat

  1 package org.apache.hadoop.examples;
  2 
  3 import java.io.IOException;
  4 import java.util.*;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.conf.*;
  7 import org.apache.hadoop.io.*;
  8 import org.apache.hadoop.mapred.*;
  9 import org.apache.hadoop.util.*;
 10 
 11 /**
 12  * 输入文本,以tab间隔
 13  * kaka    1       28
 14  * hua     0       26
 15  * chao    1
 16  * tao     1       22
 17  * mao     0       29      22
 18  * */
 19 
 20 //Partitioner函数的使用
 21 
 22 public class MyPartitioner {
 23     // Map函数
 24     public static class MyMap extends MapReduceBase implements
 25             Mapper<LongWritable, Text, Text, Text> {
 26         public void map(LongWritable key, Text value,
 27                 OutputCollector<Text, Text> output, Reporter reporter)
 28                 throws IOException {
 29             String[] arr_value = value.toString().split("\t");
 30             //测试输出
 31 //          for(int i=0;i<arr_value.length;i++)
 32 //          {
 33 //              System.out.print(arr_value[i]+"\t");
 34 //          }
 35 //          System.out.print(arr_value.length);
 36 //          System.out.println();
 37             Text word1 = new Text();
 38             Text word2 = new Text();
 39             if (arr_value.length > 3) {
 40                 word1.set("long");
 41                 word2.set(value);
 42             } else if (arr_value.length < 3) {
 43                 word1.set("short");
 44                 word2.set(value);
 45             } else {
 46                 word1.set("right");
 47                 word2.set(value);
 48             }
 49             output.collect(word1, word2);
 50         }
 51     }
 52 
 53     public static class MyReduce extends MapReduceBase implements
 54             Reducer<Text, Text, Text, Text> {
 55         public void reduce(Text key, Iterator<Text> values,
 56                 OutputCollector<Text, Text> output, Reporter reporter)
 57                 throws IOException {
 58             int sum = 0;
 59             System.out.println(key);
 60             while (values.hasNext()) {
 61                 output.collect(key, new Text(values.next().getBytes()));
 62             }
 63         }
 64     }
 65 
 66     // 接口Partitioner继承JobConfigurable,所以这里有两个override方法
 67     public static class MyPartitionerPar implements Partitioner<Text, Text> {
 68         /**
 69          * getPartition()方法的
 70          * 输入参数:键/值对<key,value>与reducer数量numPartitions
 71          * 输出参数:分配的Reducer编号,这里是result
 72          * */
 73         @Override
 74         public int getPartition(Text key, Text value, int numPartitions) {
 75             // TODO Auto-generated method stub
 76             int result = 0;
 77             System.out.println("numPartitions--" + numPartitions);
 78             if (key.toString().equals("long")) {
 79                 result = 0 % numPartitions;
 80             } else if (key.toString().equals("short")) {
 81                 result = 1 % numPartitions;
 82             } else if (key.toString().equals("right")) {
 83                 result = 2 % numPartitions;
 84             }
 85             System.out.println("result--" + result);
 86             return result;
 87         }
 88 
 89         @Override
 90         public void configure(JobConf arg0)
 91         {
 92             // TODO Auto-generated method stub
 93         }
 94     }
 95 
 96     //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner
 97     public static void main(String[] args) throws Exception {
 98         JobConf conf = new JobConf(MyPartitioner.class);
 99         conf.setJobName("MyPartitioner");
100 
101         //控制reducer数量,因为要分3个区,所以这里设定了3个reducer
102         conf.setNumReduceTasks(3);
103 
104         conf.setMapOutputKeyClass(Text.class);
105         conf.setMapOutputValueClass(Text.class);
106 
107         //设定分区类
108         conf.setPartitionerClass(MyPartitionerPar.class);
109 
110         conf.setOutputKeyClass(Text.class);
111         conf.setOutputValueClass(Text.class);
112 
113         //设定mapper和reducer类
114         conf.setMapperClass(MyMap.class);
115         conf.setReducerClass(MyReduce.class);
116 
117         conf.setInputFormat(TextInputFormat.class);
118         conf.setOutputFormat(TextOutputFormat.class);
119 
120         FileInputFormat.setInputPaths(conf, new Path(args[0]));
121         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
122 
123         JobClient.runJob(conf);
124     }
125 }

http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html

http://www.yeolar.com/note/2013/11/19/hadoop-partition/

http://www.yeolar.com/note/2013/11/19/hadoop-partition/