排序、分组 的问题看不懂

问答 漂泊 ⋅ 于 2017-12-18 18:05:18 ⋅ 最后回复由 青牛 2017-12-18 21:39:40 ⋅ 4135 阅读
package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class SortApp {
    static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
    static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
    public static void main(String[] args) throws Exception{
        final Configuration configuration = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
        if(fileSystem.exists(new Path(OUT_PATH))){
            fileSystem.delete(new Path(OUT_PATH), true);
        }
        final Job job = new Job(configuration, SortApp.class.getSimpleName());
        //1.1 指定输入文件路径
        FileInputFormat.setInputPaths(job, INPUT_PATH);        
        job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件

        //1.2指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);        
        job.setMapOutputKeyClass(NewK2.class);//指定输出<k2,v2>的类型
        job.setMapOutputValueClass(LongWritable.class);

        //1.3 指定分区类
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);

        //2.2 指定自定义的reduce类
        job.setReducerClass(MyReducer.class);        
        job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出到哪里
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
        job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
        job.waitForCompletion(true);//把代码提交给JobTracker执行
    }

    static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
        protected void map(LongWritable key, Text value,Context context) throws IOException ,InterruptedException {
            final String[] splited = value.toString().split("\t");
            final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
            final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
            context.write(k2, v2);
        };
    }

    static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        protected void reduce(NewK2 k2,Iterable<LongWritable> v2s,Context context) throws IOException ,InterruptedException {
            context.write(new LongWritable(k2.first), new LongWritable(k2.second));
        };
    }

    static class  NewK2 implements WritableComparable<NewK2>{
        Long first;
        Long second;

        public NewK2(){}

        public NewK2(long first, long second){
            this.first = first;
            this.second = second;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.first = in.readLong();
            this.second = in.readLong();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(first);
            out.writeLong(second);
        }

        /**
         * 当k2进行排序时,会调用该方法.
         * 当第一列不同时,升序;当第一列相同时,第二列升序
         */
        @Override
        public int compareTo(NewK2 o) {
            final long minus = this.first - o.first;
            if(minus !=0){
                return (int)minus;
            }
            return (int)(this.second - o.second);
        }

        @Override
        public int hashCode() {
            return this.first.hashCode()+this.second.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            if(!(obj instanceof NewK2)){
                return false;
            }
            NewK2 oK2 = (NewK2)obj;
            return (this.first==oK2.first)&&(this.second==oK2.second);
        }
    }

}

~~
~~
首先按照第一列升序排列,当第一列相同时,第二列升序排列
3 3
3 2
3 1
2 2
2 1
1 1

结果

1 1
2 1
2 2
3 1
3 2
3 3
~~
CompareTo 这个方法看不懂,为啥返回minus 就升序了 ?

回复数量: 2
  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2017-12-18 20:54:35

    这个CompareTo不是决定是否是升序的,它是用于比较对象的,你这里是用于二次排序的比较,至于为什么升序,shuffle到reducer过程默认就是升序的。那问题来了升降序是由什么决定的呢?可以自己实现一个比较器来决定(不实现也会默认有一个,就是默认升序那个)到reducer中的key是升序还是降序。

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2017-12-18 21:39:40

    你的二次排序少点东西,你可以参考这里
    http://hainiubl.com/topics/97?

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter