使用 MapReuce 实现二次排序

分享 青牛 ⋅ 于 2016-12-18 17:41:52 ⋅ 最后回复由 青牛 2016-12-25 02:20:12 ⋅ 4195 阅读

二次排序基本原理

二次排序的需求是将全量数据分组,然后在组内进行排序,比如我们要实现全国每个省各个城市的雾霾指数由高到低的排序。这就是一个二次排序的需求,在省内对各城市排序,所以排序划分数据的时候要按照省去划分,同组数据再按照城市的雾霾指数排序,这样就实现了二次排序。

用mapreduce去实现的话就需要自定义一个排序key,有两个两个属性 省 和城市的雾霾指数。正常排序按照省和雾霾指数两个字段排序,但是分发数据的时候定制一个partitioner,按照省字段分发数据,而不是使用默认的划分策略(key的hashcode对reducer个数取余)。

代码实现

自定义排序Key类型:SortKeyWritable.java

package com.hainiubl.hadoop.demo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class SortKeyWritable implements WritableComparable<SortKeyWritable> {
    private Text first;  //存放省标识
    private LongWritable sencond; //存放省城市雾霾指数

    public SortKeyWritable(){
        this(new Text(),new LongWritable());
    }
    public SortKeyWritable(Text first, LongWritable sencond) {
        this.first = first;
        this.sencond = sencond;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        sencond.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        sencond.readFields(in);     
    }

  //倒序
    @Override
    public int compareTo(SortKeyWritable o) {
        int cmp = this.first.compareTo(o.first);
        if(cmp == 0){
            return -this.sencond.compareTo(o.sencond);
        }
        return -cmp;
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((first == null) ? 0 : first.hashCode());
        result = prime * result + ((sencond == null) ? 0 : sencond.hashCode());
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (!(obj instanceof SortKeyWritable)) {
            return false;
        }
        SortKeyWritable other = (SortKeyWritable) obj;
        if (first == null) {
            if (other.first != null) {
                return false;
            }
        } else if (!first.equals(other.first)) {
            return false;
        }
        if (sencond == null) {
            if (other.sencond != null) {
                return false;
            }
        } else if (!sencond.equals(other.sencond)) {
            return false;
        }
        return true;
    }
    public Text getFirst() {
        return first;
    }
    public void setFirst(Text first) {
        this.first = first;
    }
    public LongWritable getSencond() {
        return sencond;
    }
    public void setSencond(LongWritable sencond) {
        this.sencond = sencond;
    }

}

二次排序mapreuce job类:

package com.hainiubl.hadoop.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SecondarySortJob extends Configured implements Tool {
    public static class SortMapper extends Mapper<LongWritable,Text,SortKeyWritable,Text>{

        private Text province = new Text();
        private LongWritable num = new LongWritable();
        private SortKeyWritable sortKey = new SortKeyWritable();

        //数据格式
        // 省\t城市\t雾霾指数
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, SortKeyWritable, Text>.Context context)
                throws IOException, InterruptedException {
            String[] fields = StringUtils.split(value.toString(),'\t');
            if(fields.length < 3){
                return;
            }
            province.set(fields[0]);
            num.set(Long.parseLong(fields[2]));
            sortKey.setFirst(province);
            sortKey.setSencond(num);

            context.write(sortKey, value);
        }

    }

    public static class SortReducer extends Reducer<SortKeyWritable,Text,Text,NullWritable>{

        @Override
        protected void reduce(SortKeyWritable arg0, Iterable<Text> values,
                Reducer<SortKeyWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            for(Text v : values){
                context.write(v, NullWritable.get());
            }
        }

    }
    public static class SecondaryPartitioner extends Partitioner<SortKeyWritable, Text>{

        @Override
        public int getPartition(SortKeyWritable key, Text value, int numPartitions) {
            // TODO Auto-generated method stub
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }

    }

    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        if(args.length < 2){
            System.out.println("<input dir> <output dir>");
            return 2;
        }
        Job job = Job.getInstance(getConf(), "SecondarySortJob");
        job.setJarByClass(SecondarySortJob.class);
        job.setMapperClass(SortMapper.class);
        job.setMapOutputKeyClass(SortKeyWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setPartitionerClass(SecondaryPartitioner.class);
        job.setNumReduceTasks(2);//两个reducer验证

        Path outPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outPath);

        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(outPath)){
            fs.delete(outPath, true);
            System.out.println("delete : " + outPath.getName());
        }

        return job.waitForCompletion(true) ? 0 :1 ;
    }

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        int res = ToolRunner.run(new SecondarySortJob(), args);
        System.exit(res);
        System.out.println("done");

    }

}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-青牛,http://hainiubl.com/topics/21
本帖由 青牛 于 6年前 解除加精
回复数量: 1
  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2016-12-25 02:20:12

    使用好二次排序,可以少执行一个mr步骤。会节省很多时间的

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter