Hadoop 阶段完整笔记

分享 123456789987654321 ⋅ 于 2021-02-28 11:11:26 ⋅ 1636 阅读

1.代码git仓库地址 https://gitee.com/alibabaMapengfei/bigdata.git
2.线程池使用了自定义线程池,excutor有事创建会造成线程池泄露
需要markdown笔记的,留言我发给你

MapRedece 视频18

1.本地hadoop环境

#1.修改jdk的路径
etc hadoop-env.cmd    
#2.配置hadoop环境变量
系统环境变量下
%MAVEN_HOME%\bin
%MAVEN_HOME%\sbin
HADOOP_HOME xxxxx本地hadoop解压后路径

hadoop数据类型

Java**类型** Hadoop Writable**类型**
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

第一个wordcount程序

package org.prac.mapreduce;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @Author ALIENWARE
 * @Date 2021/2/22 20:35
 * @Version 1.0
 */
public class MapReduce01 extends Configured implements Tool {

    //定义数据分隔符
    private static final String SPRLIT_STR = "\t";

    //map
    //inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
    private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{

        // 定义map函数需要用到的变量
        private Text outkey = new Text(); //map给reduce的输出key
        private LongWritable outvalue = new LongWritable(1); //map给reduce得value
        private String[] strs = null; //定义数据

        //手动实现map方法

        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException
        {
            //map中不要定义变量  父类中会一行一行执行,消耗资源
            //1.拆分数据 value -->读取到的数据
            strs =  value.toString().split(SPRLIT_STR);
            //2.遍历数据
            for (String s : strs) {
                outkey.set(s);
                outvalue.set(1);
                context.write(outkey, outvalue);
            }
        }
    }

    //reduce阶段

    /**
     * a 1 a 1 b 1 b 1 c 1
     * a 1 a 1 b 1 b 1 c 1
     * map --> reduce 聚合
     * 相同的key value -- >list
     * reduce聚合后 累加操作
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
    {
        //定义reduce需要用到的环境变量
        private LongWritable outval = new LongWritable();
        private Long sum = 0L;//map传过来的累加的数据

        /**
         * @param values map传过来reduce聚合后的list
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void reduce(Text outkey, Iterable<LongWritable> values,
                              Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

            //1. 初始化累加的变量
            sum = 0L;
            //2. 进行值的累加
            for (LongWritable value : values) {
                sum += value.get();
            }
            //3.保存累加值
            outval.set(sum);
            //4. 输出
            context.write(outkey, outval);
        }
    }

    //mapreduce主要执行的任务

    @Override
    public int run(String[] args)
    {
        try {
        //获取已经加载好的配置的conf
        Configuration conf = this.getConf();
        //编写本次job
        Job job = Job.getInstance(conf);
        //job开始进行  固定三部配置

        //1. 类的配置 主执行类设置,谁有main方法就设置谁
        job.setJarByClass(MapReduce01.class);
        //设置数据的输入格式化类
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);//设置map
        //如果map和 reduce的输出来星一样.可以用一个输出
        //job.setMapOutputKeyClass(Text.class);//map  key的输出  固定的
        //job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的
        job.setReducerClass(MyReducer.class);//设置reduce
        job.setOutputKeyClass(Text.class);//reduce的key
        job.setOutputValueClass(LongWritable.class);//reduce的vcalue
        job.setOutputFormatClass(TextOutputFormat.class);//设置输出
        //2. 路径设置
        //输入路径
        //FileInputFormat.addInputPath(job,new Path(args[0]));
        //保证输出路径必须没有
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(out)){
            fs.delete(out,true );
            System.out.println(job.getJobName() + "路径已经被删除了!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job,out);
        // 3.执行配置
        long start = System.currentTimeMillis();
        boolean cons = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
        System.out.println(msg);
        System.out.println(Math.abs(end-start)/1000+"秒!");

        }catch (Exception e){
            e.printStackTrace();
        }
        return 0;
    }

    //运行mapreduce
    /**
     * mapreduce运行流程
     * 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
     * 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
     * 3.ToolRunner.run接管mapreduce执行,进行参数设置
     */
    public static void main(String[] args)
    {
        try {
            System.exit(ToolRunner.run(new MapReduce01(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT

MapReduce优化

counter的使用

统计数据的好坏
package org.prac.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * mapreduce优化1
 * @desc counter的使用 -->统计符合需求的数据
 * @Author ALIENWARE
 * @Date 2021/2/23 20:25
 * @Version 1.0
 */
public class MyWordCount02 extends Configured implements Tool {

    private static final String SPRLIT_STR = "\t";//分隔符

    protected static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable> {

        //定义map需要用到的变量
        private Text outkey = new Text();
        private LongWritable outval = new LongWritable(1);
        private String[] strs = null; //map的数据

        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException
        {
            try{
            //拆分数据
            // map 进行数的拆分
            strs = value.toString().split(SPRLIT_STR);
            // map使用ounter
            context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
            // 业务判断 是好 还是 坏
            if(null != strs && strs.length == 3){
                // 好
                context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
                outkey.set(strs[2].trim());
                // 输出
                context.write(outkey, outval);
            }else{
                // 坏
                context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
            }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

        // 创建reduce函数需要用到的变量
        private LongWritable outval = new LongWritable();
        private long sum = 0L;

        @Override
        protected void reduce(Text outkey, Iterable<LongWritable> values,
                              Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

            // 数据累加
            sum = 0;
            // 遍历累加
            for (LongWritable l : values) {
                sum+=l.get();
            }
            // 赋值
            outval.set(sum);
            // 输出
            context.write(outkey, outval);
        }
    }

    @Override
    public int run(String[] args)
    {
        try {
            //获取已经加载好的配置的conf
            Configuration conf = this.getConf();
            //编写本次job
            Job job = Job.getInstance(conf);
            //job开始进行  固定三部配置

            //1. 类的配置 主执行类设置,谁有main方法就设置谁
            job.setJarByClass(MyWordCount02.class);
            //设置数据的输入格式化类
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(MyMapper.class);//设置map
            //如果map和 reduce的输出来星一样.可以用一个输出
            //job.setMapOutputKeyClass(Text.class);//map  key的输出  固定的
            //job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的
            job.setReducerClass(MyReducer.class);//设置reduce
            job.setOutputKeyClass(Text.class);//reduce的key
            job.setOutputValueClass(LongWritable.class);//reduce的vcalue
            job.setOutputFormatClass(TextOutputFormat.class);//设置输出
            //2. 路径设置
            //输入路径
            //FileInputFormat.addInputPath(job,new Path(args[0]));
            //保证输出路径必须没有
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(out)) {
                fs.delete(out, true);
                System.out.println(job.getJobName() + "路径已经被删除了!");
            }
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
            // 3.执行配置
            long start = System.currentTimeMillis();
            //
            boolean cons = job.waitForCompletion(true);
            long end = System.currentTimeMillis();
            String msg = "job状态" + (cons ? "SUCCESS!" : "FAILE!");

            // 没有log4j的情况记录counter 展示counter
            if (cons) {
                //map task reduce task --> ci=ounter --> 执行完成之后 applicationmaster 统计
                Counters counters = job.getCounters();
                System.out.println(job.getJobName() + "'s counters count : " + counters.countCounters());
                for (CounterGroup counter : counters) {
                    System.out.println("\t"+ counter.getDisplayName());
                    for (Counter counter1 : counter) {
                        System.out.println("\t\t"+counter1.getDisplayName() + "=" + counter1.getValue());
                    }
                }
            }

            System.out.println(msg);
            System.out.println(Math.abs(end-start)/1000+"秒!");

        }catch (Exception e){
            e.printStackTrace();
        }
        return 0;
    }

    public static void main(String[] args)
    {
        try {
            System.exit(ToolRunner.run(new MyWordCount02(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

combiner实现

package org.prac.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @Desc combiner实现
 * @Author ALIENWARE
 * @Date 2021/2/22 20:35
 * @Version 1.0
 */

/**
 * 注意 : combiner因为聚合的map的值 所以 适用场合有限 求极限值 max min sun count
 *       不能用于求平局值
 * 注意 :  combiner是对map端的聚合
 *          1.combiner 输入 = map输出
 *          2.combiner 输出 == reduce输入
 *          3.reduce 输入 =  mapreduce
 *          4.combiner 输入 = 输出 = map 输出
 */

public class MapCombiner03 extends Configured implements Tool {

    //定义数据分隔符
    private static final String SPRLIT_STR = "\t";

    //map
    //inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
    private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{

        // 定义map函数需要用到的变量
        private Text outkey = new Text(); //map给reduce的输出key
        private LongWritable outvalue = new LongWritable(1); //map给reduce得value
        private String[] strs = null; //定义数据

        //手动实现map方法

        /**
         *
         * @param key
         * @param value  读取文件的值(输入的value值)
         * @param context
         */
        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException
        {
            //map中不要定义变量  父类中会一行一行执行,消耗资源
            //1.拆分数据 value -->读取到的数据
            strs =  value.toString().split(SPRLIT_STR);
            //2.遍历数据
            //outkey -->map给reduce的key值
            for (String s : strs) {
                outkey.set(s);
                outvalue.set(1);//设置读到单词的个数计数
                context.write(outkey, outvalue);
            }
        }
    }

    //comnbiner
    public static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{

        //定义reduce需要用到的环境变量
        private LongWritable outval = new LongWritable();
        private Long sum = 0L;//map传过来的累加的数据

        @Override
        public void reduce(Text outkey, Iterable<LongWritable> values,
                           Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
/*
            //1. 初始化累加的变量
            sum = 0L;
            //2. 进行值的累加
            for (LongWritable value : values) {
                sum += value.get();
            }
            //3.保存累加值
            outval.set(sum);
            //4. 输出
            context.write(outkey, outval);*/
        }
    }

    //reduce阶段

    /**
     * a 1 a 1 b 1 b 1 c 1
     * a 1 a 1 b 1 b 1 c 1
     * map --> reduce 聚合
     * 相同的key value -- >list
     * reduce聚合后 累加操作
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
    {
        //定义reduce需要用到的环境变量
        private LongWritable outval = new LongWritable();
        private Long sum = 0L;//map传过来的累加的数据

        /**
         * @param values map传过来reduce聚合后的list
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void reduce(Text outkey, Iterable<LongWritable> values,
                              Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

            //1. 初始化累加的变量
            sum = 0L;
            //2. 进行值的累加
            for (LongWritable value : values) {
                sum += value.get();
            }
            //3.保存累加值
            outval.set(sum);
            //4. 输出
            context.write(outkey, outval);
        }
    }

    //mapreduce主要执行的任务

    @Override
    public int run(String[] args)
    {
        try {
        //获取已经加载好的配置的conf
        Configuration conf = this.getConf();
        //编写本次job
        Job job = Job.getInstance(conf);
        //job开始进行  固定三部配置

        //1. 类的配置 主执行类设置,谁有main方法就设置谁
        job.setJarByClass(MapCombiner03.class);
        //设置数据的输入格式化类
        job.setInputFormatClass(TextInputFormat.class);
        job.setCombinerClass(MyCombiner.class);
        job.setMapperClass(MyMapper.class);//设置map
        //如果map和 reduce的输出来星一样.可以用一个输出
        //job.setMapOutputKeyClass(Text.class);//map  key的输出  固定的
        //job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的
        job.setReducerClass(MyReducer.class);//设置reduce
        job.setOutputKeyClass(Text.class);//reduce的key
        job.setOutputValueClass(LongWritable.class);//reduce的vcalue
        job.setOutputFormatClass(TextOutputFormat.class);//设置输出
        //2. 路径设置
        //输入路径
        //FileInputFormat.addInputPath(job,new Path(args[0]));
        //保证输出路径必须没有
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(out)){
            fs.delete(out,true );
            System.out.println(job.getJobName() + "路径已经被删除了!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job,out);
        // 3.执行配置
        long start = System.currentTimeMillis();
        //
        boolean cons = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
        System.out.println(msg);
        System.out.println(Math.abs(end-start)/1000+"秒!");

        }catch (Exception e){
            e.printStackTrace();
        }
        return 0;
    }

    //运行mapreduce
    /**
     * mapreduce运行流程
     * 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
     * 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
     * 3.ToolRunner.run接管mapreduce执行,进行参数设置
     */
    public static void main(String[] args)
    {
        try {
            System.exit(ToolRunner.run(new MapCombiner03(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT

partitioner应用(设置四个分区)

package org.prac.mapreduce;

import entity.StudentWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * 返回四个partitioner,给4个reduce
 * partionar应用
 * 开启4个reduce , map给到partionner分区,然后交给多个reduce处理
 */

public class MapPartitioner04 extends Configured implements Tool {

    //定义数据分隔符
    private static final String SPLIT_STR1 = "\t";

    //map
    //inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
    private static class MyMapper extends Mapper<LongWritable, Text, StudentWritable, NullWritable>{

        // 定义map需要用到的环境变量
        private StudentWritable outkey = new StudentWritable();
        private String[] strs = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, StudentWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {

            // map 进行数的拆分
            strs = value.toString().split(SPLIT_STR1);
            // 演示counter
            context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
            // 1    盖谦  2001-01-14  11480630    1   东城区第1中学 1   东城区 540
            // 业务判断 是好 还是 坏
            if(null != strs && strs.length == 9){
                // 好
                context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
                outkey.setExamNo(strs[4].trim());
                outkey.setStudentName(strs[1].trim());
                outkey.setBirthday(strs[2].trim());
                outkey.setSchool(strs[5].trim());
                outkey.setAreaName(strs[7].trim());
                outkey.setScore(Integer.parseInt(strs[8].trim()));
                context.write(outkey, NullWritable.get());
            }else{
                // 坏
                context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
            }

        }

    }

    //partitoner
    private static class MyPartitioner extends Partitioner<StudentWritable, NullWritable> {

        // 通过设置 getPartition 进行reduceID的获取
        @Override
        public int getPartition(StudentWritable key, NullWritable value, int numPartitions) {

            // 一本 550 分以上 包含550分
            if(key.getScore() >= 550){
                return 0;
            }
            if(key.getScore() >=450 && key.getScore() < 550){
                return 1;
            }
            if(key.getScore() >= 250 && key.getScore() < 450){
                return 2;
            }
            return 3;
        }

    }

    //mapreduce主要执行的任务

    @Override
    public int run(String[] args)
    {
        try {
        //获取已经加载好的配置的conf
        Configuration conf = this.getConf();
        //编写本次job
        Job job = Job.getInstance(conf);
        //job开始进行  固定三部配置

        //1. 类的配置 主执行类设置,谁有main方法就设置谁
        job.setJarByClass(MapPartitioner04.class);
        //设置数据的输入格式化类
        job.setInputFormatClass(TextInputFormat.class);
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(4);
        job.setMapperClass(MyMapper.class);//设置map
        job.setOutputKeyClass(StudentWritable.class);//reduce的key
        job.setOutputValueClass(NullWritable.class);//reduce的vcalue
        job.setOutputFormatClass(TextOutputFormat.class);//设置输出
        //2. 路径设置
        //输入路径
        //FileInputFormat.addInputPath(job,new Path(args[0]));
        //保证输出路径必须没有
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(out)){
            fs.delete(out,true );
            System.out.println(job.getJobName() + "路径已经被删除了!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job,out);
        // 3.执行配置
        long start = System.currentTimeMillis();
        //
        boolean cons = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
        System.out.println(msg);
        System.out.println(Math.abs(end-start)/1000+"秒!");

        }catch (Exception e){
            e.printStackTrace();
        }
        return 0;
    }

    //运行mapreduce
    /**
     * mapreduce运行流程
     * 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
     * 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
     * 3.ToolRunner.run接管mapreduce执行,进行参数设置
     */
    public static void main(String[] args)
    {
        try {
            System.exit(ToolRunner.run(new MapPartitioner04(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//设置reduce输出压缩设置 (3种方式)
// -Dmapreduce.output.fileoutputformat.compress=true
// -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec 输入路径/文件 输出路径
/*设置一个参数有三种方法
        1. -D参数传递数据 针对的是针对本次运行的job 适中
        2. conf.set(key,val) 针对具体MapReduce 影响范围最小
        3. xml 参数传递
他们的参数加载顺序是这样的
    xml 最先被加载
    -D会重写xml的同名属性
    conf.set会重写-D的同名属性*/

mapreduce 去重实现

package org.prac.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * mapreduce 去重实现 ,驱虫的数据放到mapreduce的key上,value输出nullwritable空值
 */

public class MapReduceDistinct05 extends Configured implements Tool {

    //定义数据分隔符
    private static final String SPRLIT_STR = "\t";

    private static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

        // 定义map需要用到的环境变量
        private Text outkey = new Text();
        private String[] strs = null;
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {

            // map 进行数的拆分
            strs = value.toString().split(SPRLIT_STR);

            // 遍历
            for (String s : strs) {
                outkey.set(s);
                context.write(outkey, NullWritable.get());
            }
        }
    }

    //reduce阶段
    public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable>
    {
        //定义reduce需要用到的环境变量
        private LongWritable outval = new LongWritable();
        private Long sum = 0L;//map传过来的累加的数据

        @Override
        public void reduce(Text outkey, Iterable<NullWritable> values,
                              Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {

            context.write(outkey, NullWritable.get());
        }
    }

    //mapreduce主要执行的任务
    @Override
    public int run(String[] args)
    {
        try {
        //获取已经加载好的配置的conf
        Configuration conf = this.getConf();
        //编写本次job
        Job job = Job.getInstance(conf,"distinct");
        //job开始进行  固定三部配置

        //1. 类的配置 主执行类设置,谁有main方法就设置谁
        job.setJarByClass(MapReduceDistinct05.class);
        //设置数据的输入格式化类
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);//设置map
        job.setReducerClass(MyReducer.class);//设置reduce
        job.setOutputKeyClass(Text.class);//reduce的key
        job.setOutputValueClass(NullWritable.class);//reduce的vcalue
        job.setOutputFormatClass(TextOutputFormat.class);//设置输出
        //2. 路径设置
        //输入路径
        //FileInputFormat.addInputPath(job,new Path(args[0]));
        //保证输出路径必须没有
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(out)){
            fs.delete(out,true );
            System.out.println(job.getJobName() + "路径已经被删除了!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job,out);
        // 3.执行配置
        long start = System.currentTimeMillis();
        //
        boolean cons = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
        System.out.println(msg);
        System.out.println(Math.abs(end-start)/1000+"秒!");

        }catch (Exception e){
            e.printStackTrace();
        }
        return 0;
    }

    //运行mapreduce
    /**
     * mapreduce运行流程
     * 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
     * 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
     * 3.ToolRunner.run接管mapreduce执行,进行参数设置
     */
    public static void main(String[] args)
    {
        try {
            System.exit(ToolRunner.run(new MapReduceDistinct05(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT

mapreduce 最大值最小值实现(利用combiner获取最大最小值)

/**
1.利用map读取数据, map key设置为x, 数据给到value
2.利用combiner 获取最大最小值
3.利用reduce输出
*/

package org.prac.mapreduce;

import java.io.IOException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *  mapreduce 最大值最小值实现
 */

public class MapReduceManMin06 extends Configured implements Tool {

    private static final String SPLIT_STR1 = "\t";
    private static final String SPLIT_STR2 = "\001";

    private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

        // 定义map需要用到的环境变量 //这个x就是map传给reduce的key值
        private Text outkey = new Text("x");
        private Text outval = new Text();
        private String[] strs = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            // map 进行数的拆分
            strs = value.toString().split(SPLIT_STR1);

            // 2018-01-01 001616528 236701 强力VC银翘片 6.0 82.8 69.0
            context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
            if (null != strs && strs.length == 7) {
                context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
                // 3 和 6
                outval.set(strs[3].trim() + SPLIT_STR2 + strs[6].trim());
                context.write(outkey, outval);

            } else {
                context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
            }
        }

    }

    private static class MyCombiner extends Reducer<Text, Text, Text, Text> {

        private Text outval = new Text();

        // 冒泡排序
        private String current_key = "";
        private Double current_val = 0D;
        private String max_key = "";
        private Double max_val = 0D;
        private String min_key = "";
        private Double min_val = 0D;

        @Override
        protected void reduce(Text outkey, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            // 遍历数据
            for (Text t : values) {

                current_key = t.toString().split(SPLIT_STR2)[0].trim();
                current_val = Double.parseDouble(t.toString().split(SPLIT_STR2)[1].trim());

                // 冒泡
                if (current_val >= max_val) {
                    max_val = current_val;
                    max_key = current_key;
                }

                if (current_val <= min_val || min_val == 0) {
                    min_val = current_val;
                    min_key = current_key;
                }
            }

            outval.set(max_key + SPLIT_STR2 + max_val);
            context.write(outkey, outval);
            outval.set(min_key + SPLIT_STR2 + min_val);
            context.write(outkey, outval);

        }

    }

    private static class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> {

        private Text outkey = new Text();
        private DoubleWritable outval = new DoubleWritable();
        // 冒泡排序
        private String current_key = "";
        private Double current_val = 0D;
        private String max_key = "";
        private Double max_val = 0D;
        private String min_key = "";
        private Double min_val = 0D;

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                              Reducer<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {

            // 遍历数据
            for (Text t : values) {

                current_key = t.toString().split(SPLIT_STR2)[0].trim();
                current_val = Double.parseDouble(t.toString().split(SPLIT_STR2)[1].trim());

                // 冒泡
                if (current_val >= max_val) {
                    max_val = current_val;
                    max_key = current_key;
                }

                if (current_val <= min_val || min_val == 0) {
                    min_val = current_val;
                    min_key = current_key;
                }
            }

            outkey.set(max_key);
            outval.set(max_val);
            context.write(outkey, outval);
            //每个context.write会输出一个数据
            outkey.set(min_key);
            outval.set(min_val);
            context.write(outkey, outval);
        }

    }

    @Override
    public int run(String[] args) throws Exception {

        // 创建本次的job
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "distinct");

        // 设置 job

        // 第一步设置类
        job.setJarByClass(MapReduceManMin06.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setCombinerClass(MyCombiner.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // 第二步 设置路径
        Path in = new Path(args[0]);
        Path in1 = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(out)) {
            fs.delete(out, true);
            System.out.println(job.getJobName() + "'s output dir is deleted!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        // 第三步 设置执行
        long start = System.currentTimeMillis();
        boolean con = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
        System.out.println(msg);
        return 0;
    }

    public static void main(String[] args) {

        try {
            System.exit(ToolRunner.run(new MapReduceManMin06(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT

多目录输出reduce结果

package org.prac.mapreduce;

import java.io.IOException;

import entity.DrugWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 序列化方式实现最大值最小值
 * 多目录输出reduce结果
 */

public class MyMRDemo7 extends Configured implements Tool {

    private static final String SPLIT_STR1 = "\t";

    private static class MyMapper extends Mapper<LongWritable, Text, DrugWritable, NullWritable> {

        // 定义map需要用到的环境变量
        private DrugWritable outkey = new DrugWritable();
        private String[] strs = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DrugWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {

            // map 进行数的拆分
            strs = value.toString().split(SPLIT_STR1);

            // 2018-01-01 001616528 236701 强力VC银翘片 6.0 82.8 69.0
            context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
            if (null != strs && strs.length == 7) {
                context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
                // 3 和 6
                outkey.setName(strs[3].trim());
                outkey.setPay(Double.parseDouble(strs[6].trim()));
                context.write(outkey, NullWritable.get());

            } else {
                context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
            }
        }

    }
//combiner
private static class MyCombiner extends Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable> {

    private DrugWritable outkey = new DrugWritable();

    // 冒泡排序
    private String current_key = "";
    private Double current_val = 0D;
    private String max_key = "";
    private Double max_val = 0D;
    private String min_key = "";
    private Double min_val = 0D;

    @Override
    protected void reduce(DrugWritable key, Iterable<NullWritable> values, Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
            throws IOException, InterruptedException {

        current_key = key.getName();
        current_val = key.getPay();

        if(current_val >= max_val){
            max_key = current_key;
            max_val = current_val;
        }
        if(current_val <= min_val || min_val == 0){
            min_val = current_val;
            min_key = current_key;
        }
    }

    @Override
    protected void cleanup(Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
            throws IOException, InterruptedException {
        outkey.setName(max_key);
        outkey.setPay(max_val);
        // 统一输出
        context.write(outkey, NullWritable.get());

        outkey.setName(min_key);
        outkey.setPay(min_val);
        // 统一输出
        context.write(outkey, NullWritable.get());

    }

}

    //reduce
    private static class MyReducer extends Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable> {

        private DrugWritable outkey = new DrugWritable();

        //多目录输出
        private MultipleOutputs<DrugWritable,NullWritable> outputs = null;

        @Override
        protected void setup(Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
            // 实例化 多目录输出第一步
            outputs = new MultipleOutputs<>(context);
        }

        // 冒泡排序
        private String current_key = "";
        private Double current_val = 0D;
        private String max_key = "";
        private Double max_val = 0D;
        private String min_key = "";
        private Double min_val = 0D;

        @Override
        protected void reduce(DrugWritable key, Iterable<NullWritable> values, Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {

            current_key = key.getName();
            current_val = key.getPay();

            if(current_val >= max_val){
                max_key = current_key;
                max_val = current_val;
            }
            if(current_val <= min_val || min_val == 0){
                min_val = current_val;
                min_key = current_key;
            }
        }

        //MapReduce使用cleanup()方法实现排序筛选后输出

        /**
         * 背景
         * MapReduce的map和reduce方法有一个局限性,就是map()方法每次只处理一行,而reduce()方法每次只处理一组。
         * 并且reduce一般都是将处理每一组数据后的结果都写出。但有时候想要只输出一部分结果,
         * 比如在Wordcount程序中,想要输出单词数量前三的统计信息,这时就可以用cleanup()方法来实现。
         */
        @Override
        protected void cleanup(Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {

            outkey.setName(max_key);
            outkey.setPay(max_val);
            //多目录输出 第二步
            outputs.write(outkey, NullWritable.get(), "max/maxval");
            //统一输出
            context.write(outkey, NullWritable.get());
            outkey.setName(min_key);
            outkey.setPay(min_val);
            //统一输出
            context.write(outkey, NullWritable.get());
            // 多目录输出
            outputs.write(outkey, NullWritable.get(), "min/minval");
            // 如果用完了记得关闭这个流 要不报错
            if(null != outputs){
                outputs.close();
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        // 创建本次的job
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "distinct");

        // 设置 job
        // 第一步设置类
        job.setJarByClass(MyMRDemo7.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        //3.使用combiner处理输出给到reduce
        //这里不要设置为reduce,否则会产生多个输出文件
        job.setCombinerClass(MyCombiner.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(DrugWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // 第二步 设置路径
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(out)) {
            // hadoop fs -rm -r -skipTrash --> 递归删除
            fs.delete(out, true);
            System.out.println(job.getJobName() + "'s output dir is deleted!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        // 第三步 设置执行

            boolean con = job.waitForCompletion(true);
            String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
            System.out.println(msg);
        return 0;
    }

    public static void main(String[] args) {

        try {
            System.exit(ToolRunner.run(new MyMRDemo7(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

内连接查询

利用多目录输入设置,读取到两个map的信息,通过分隔符,利用reduce切割数据,

package org.prac.mapreduce;

import java.io.IOException;

import entity.AreaWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 内连接查询
 * 利用多目录输入设置,读取到两个map的信息,通过分隔符,利用reduce切割数据,
 * 然后输出对应的结果
 */
public class MyMR7 extends Configured implements Tool {

    private static final String SPLIT_STR1 = "\t";
    private static final String SPLIT_STR2 = "\001";

    // 处理地区
    private static class MyMapper1 extends Mapper<LongWritable, Text, IntWritable, Text> {

        // 创建map需要用到的变量
        private IntWritable outkey = new IntWritable();
        private Text outval = new Text();
        private String[] strs = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {

            // 数据整理
            strs = value.toString().split(SPLIT_STR1); // [1,北京]

            // 防御式编程
            if(null != strs && strs.length == 2){

                outkey.set(Integer.parseInt(strs[0].trim()));
                outval.set("a" + SPLIT_STR2 + strs[1].trim());
                // 输出
                context.write(outkey, outval);
            }

        }

    }

    private static class MyMapper2 extends Mapper<LongWritable, Text, IntWritable, Text> {

        // 创建map需要用到的变量
        private IntWritable outkey = new IntWritable();
        private Text outval = new Text();
        private String[] strs = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {

            // 数据整理
            strs = value.toString().split(SPLIT_STR1); // [1,2010,1900]

            // 防御式编程
            if(null != strs && strs.length == 3){

                outkey.set(Integer.parseInt(strs[0].trim()));
                outval.set("b" + SPLIT_STR2 + strs[1].trim() + SPLIT_STR1  + strs[2].trim());
                // 输出
                context.write(outkey, outval);
            }

        }

    }

    private static class MyReducer extends Reducer<IntWritable, Text, AreaWritable, NullWritable> {

        // 创建reduce需要用到的变量
        private AreaWritable outkey = new AreaWritable();
        private String tmp = "";
        private String aname = "";
        private String[] strs = null;
        @Override
        protected void reduce(IntWritable key, Iterable<Text> values,
                Reducer<IntWritable, Text, AreaWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {

            // 初始化变量
            tmp = "";

            // 循环变量
            for (Text t : values) {
                tmp+=t.toString() + ",";
            }

            // 1. 判断需要的数据 同时包含a 和 b的数据
            if(tmp.indexOf("a") > -1 && tmp.indexOf("b") > -1){
                // 获取地区名称
                aname = tmp.substring(tmp.indexOf("a"));
                aname = aname.substring(2,aname.indexOf(","));
                strs = tmp.substring(tmp.indexOf("b")).split(",");
                // 循环
                for (String s : strs) {

                    if(s.startsWith("b")){ // b\0012010\t1900

                        // 年份和就业数据
                        outkey.setAid(key.get());
                        outkey.setAname(aname);
                        outkey.setYear(Integer.parseInt(s.split(SPLIT_STR2)[1].trim().split(SPLIT_STR1)[0].trim()));
                        outkey.setCount(Long.parseLong(s.split(SPLIT_STR2)[1].trim().split(SPLIT_STR1)[1].trim()));
                        context.write(outkey, NullWritable.get());
                    }
                }
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        // 创建本次的job
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "innerjoin");

        // 设置 job

        // 第一步设置类
        job.setJarByClass(MyMRDemo7.class);
        job.setReducerClass(MyReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(AreaWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // 第二步 设置路径
        Path out = new Path(args[args.length-1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(out)) {
            // hadoop fs -rm -r -skipTrash --> 递归删除
            fs.delete(out, true);
            System.out.println(job.getJobName() + "'s output dir is deleted!");
        }

        // 多目录输入的设置
        // 用于处理 地区信息
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MyMapper1.class);
        // 用于处理从业人数
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MyMapper2.class);

        FileOutputFormat.setOutputPath(job, out);
        // 第三步 设置执行
        long start = System.currentTimeMillis();
        boolean con = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
        System.out.println(msg);
        System.out.println("JOB_COST : " + ((end - start) / 1000) + " SECONDS!");

        return 0;
    }

    public static void main(String[] args) {

        try {
            System.exit(ToolRunner.run(new MyMRDemo7(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

mapreduce数据倾斜问题

//大大表 或者 大小表 内连接产生的问题

//解决方案 利用map处理数据
HDFS的分布式缓存技术 --> 小表数据 --> HDFS的分布式缓存 --> MAP --> 数据块计算 --> HDFS --> 小表数据发送给 这个要计算的 MAP --> setup --> 小表数据进行加载 --> 大表数据匹配 --> map端对数据进行清理

大小表 semijoin 实现

package org.prac.mapreduce;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import entity.AreaWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 大小表 semijoin 实现
 * 1.管理员身份启动软件
 * 2.参数语法 :
 * -Dmapreduce.job.cache.files=小表所在的位置 项目的输入路径 项目的输出路径
 * 3.map实现setup方法 通过分布式缓存加载到map里面,
 * 4.给到map方法然后输出数据
 */
public class MyMRDemo8 extends Configured implements Tool {

    private static class MyMapper extends Mapper<LongWritable, Text, AreaWritable, NullWritable> {

        // 加载小表的数据 HDFS 分布式缓存 --> 发送你mapper执行的本地目录 --> 通过文件名获取文件

        //解析到的小表数据file1 放到map里面
        private static Map<Integer,String> areaMap = new HashMap<Integer,String>(16);
        @Override
        protected void setup(Mapper<LongWritable, Text, AreaWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
            // 获取文件位置 小表
            URI uri =  context.getCacheFiles()[0];
            // 获取文件名
            String fileName = uri.getPath().toString().substring(uri.getPath().toString().lastIndexOf("/")+1);
            // 解析file1
            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(fileName),"UTF-8"));
            String line = null;
            while((line = reader.readLine()) != null){
                areaMap.put(Integer.parseInt(line.split("\t")[0].trim()),line.split("\t")[1].trim());
            }

            // 关闭
            if(null != reader){
                reader.close();
            }

        }

        private String[] strs = null;
        private String aname = null;
        private AreaWritable outkey = new AreaWritable();
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, AreaWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
            // 拆分数据
            strs = value.toString().split("\t");
            // 防御式编程
            if(null != strs && strs.length == 3){

                aname = areaMap.get(Integer.parseInt(strs[0].trim()));

                if(null != aname && !"".equals(aname.trim())){
                    // 正常
                    outkey.setAid(Integer.parseInt(strs[0].trim()));
                    outkey.setAname(aname);
                    outkey.setYear(Integer.parseInt(strs[1].trim()));
                    outkey.setCount(Long.parseLong(strs[2].trim()));
                    context.write(outkey, NullWritable.get());
                }
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        // 获取配置文件的加载对象
        Configuration conf = this.getConf();

        // 设置方法的返回值
        Job job = Job.getInstance(conf, "semijoin");

        // 设置类
        job.setJarByClass(MyMRDemo8.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(AreaWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 第二步 设置路径
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(out)) {
            // hadoop fs -rm -r -skipTrash --> 递归删除
            fs.delete(out, true);
            System.out.println(job.getJobName() + "'s output dir is deleted!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        // 第三步 设置执行
        long start = System.currentTimeMillis();
        boolean con = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
        System.out.println(msg);
        System.out.println("JOB_COST : " + ((end - start) / 1000) + " SECONDS!");

        return 0;
    }

    public static void main(String[] args) {
        try {
            System.exit(ToolRunner.run(new MyMRDemo8(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

mapreduce实现单reduce排序查询 -- 正序排列(key简单类型)

package org.prac.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import util.MyIntComparator;

/**
 * mapreduce实现单reduce排序查询
 *
 */
public class MyMRDemo9 extends Configured implements Tool {

    private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text>{

        private IntWritable outkey = new IntWritable();
        private Text outval = new Text();
        private String[] strs = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {
            strs = value.toString().split("\t");
            outkey.set(Integer.parseInt(strs[0].trim()));
            outval.set(strs[1].trim());
            context.write(outkey, outval);
        }
    }

    private static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text>{

        private Text outval = new Text();

        @Override
        protected void reduce(IntWritable key, Iterable<Text> values,
                Reducer<IntWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {

            outval.set(values.iterator().next().toString());
            context.write(key,outval);
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        // 获取配置文件
        Configuration conf = this.getConf();
        // 创建job
        Job job = Job.getInstance(conf, "desc sort");

        // 类设置
        job.setJarByClass(MyMRDemo9.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 第二步 设置路径
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(out)) {
            // hadoop fs -rm -r -skipTrash --> 递归删除
            fs.delete(out, true);
            System.out.println(job.getJobName() + "'s output dir is deleted!");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        // 第三步 设置执行
        long start = System.currentTimeMillis();
        boolean con = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
        System.out.println(msg);
        System.out.println("JOB_COST : " + ((end - start) / 1000) + " SECONDS!");

        return 0;
    }

    public static void main(String[] args) {

        try {
            System.exit(ToolRunner.run(new MyMRDemo9(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

mapreduce实现单reduce排序查询 -- 倒序排列(key简单类型)

package util;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * key 实现自定义比较器的时候
 * @author Su
 *
 */
public class MyIntComparator extends WritableComparator {

    // 1. 告诉人家你需要比
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75397
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter