mapreduce

教程 薪牛 ⋅ 于 2023-01-03 23:02:36 ⋅ 1939 阅读

1.1mapreduce诞生背景

file

1)海量数据在单机上处理因为硬件资源限制,无法胜任;

​ 2)将单机版程序扩展到集群来分布式运行,极大增加程序的复杂度和开发难度;

​ 3)引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理;

1.1mapreduce介绍

  1. MapReduce是一种分布式计算模型
  2. Doug Cutting根据《MapReduce: Simplified Data Processing on Large Clusters》设计实现了Hadoop中基于HDFS的MapReduce
  3. MapReduce是由两个阶段组成:Map和Reduce,用户只需要实现map以及reduce两个函数,即可实现分布式计算,这样做的目的是简化分布式程序的开发和调试周期
  4. JobTracker/ResourceManager:任务调度者,管理多个NodeManager。ResourceManager是Hadoop2.0版本之后引入Yarn之后用于替代JobTracke部分功能的机制
  5. TaskTracker/NodeManager:任务执行者

1.3 mapreduce是做什么的?

mapreduce的核心思想:化大为小,分而治之。

​ mapreduce 的主要组成部分:Mapper 和 Reducer。

​ Mapper: 负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。

​ Reducer: 负责对map阶段的结果进行汇总。

下面以单词统计为例,我们来了解一下mapper节点和reducer阶段都干了什么事情?

file

2 配置开发环境

2.1开发环境准备

在启动镜像的同时,打开远程桌面,找到idea

注意:远程桌面只能是在启动镜像的时候使用,没有启动镜像没有办法找到打开远程桌面

file

2.2 创建maven工程

创建一个工程,并单独为mapreduce项目创建个module

file

2.3 导入相关依赖包

找到pom文件,将hadoop依赖包的坐标添加进去。

file

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.4</version>
    </dependency>

2.4 导入集群的配置文件

在main目录下创建一个resources目录,并设置为资源目录,将集群的core-site.xml和hdfs-site.xml放到resources目录下,目的是为了能识别hadoop集群

#将集群两个文件,下载到远程桌面服务器所在的桌面上
scp /usr/local/hadoop/etc/hadoop/hdfs-site.xml   root@11.99.173.63:/headless/Desktop
scp /usr/local/hadoop/etc/hadoop/core-site.xml   root@11.99.173.63:/headless/Desktop

file

将配置文件放到resources目录下

file

2.5 编写代码

1)自定义的mapreduce类

2)继承Mapper类,实现map函数

3)继承Reducer类,实现reduce函数

4)main()中设置Job相关信息 和 提交Job运行

Mapper代码

package com.hainiu.wordcount;

import org.apache.hadoop.io.*;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/20
 */
/*
* KEYIN:输入到mapper类的key的类型
* VALUEIN:输入到mapper类的value的类型
*  KEYOUT:mapper类输出的key的类型
* VALUEOUT:mapper类输出的value的类型
*
* hadoop要求mapreduce计算过程中,所有的数据必须经过序列化,基本类型是没有序列化的,所以hadoop帮我们将基本类型重新实现了序列化
* hadoop用的序列化框架是avro 比java原生的序列化框架性能更高
* */
public class WordCountMapper extends Mapper <LongWritable, Text, Text, IntWritable> {

    //--没读取一行数据就会调用一次map方法
    //-- key 行首偏移量
    //-- value 一行数据
    //-- context是 mr的上下文对象
   Text keyout=new Text();
   IntWritable valueout=new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("map执行了。。。。。");
        //--value 就是一行数据
        String line = value.toString();
        //--切分 【hello world】
        String[] words = line.split(" ");
        for (String word : words) {
            //--将每个单词进行打标记1输出
            //--hadoop会根据你map输出的key帮我们进行聚合
            keyout.set(word);
            valueout.set(1);
            context.write(keyout,valueout);
        }
    }
}

Reducer代码

package com.hainiu.wordcount;

import org.apache.hadoop.io.*;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/20
 */
/*
* KEYIN:输入到mapper类的key的类型
* VALUEIN:输入到mapper类的value的类型
*  KEYOUT:mapper类输出的key的类型
* VALUEOUT:mapper类输出的value的类型
*
* hadoop要求mapreduce计算过程中,所有的数据必须经过序列化,基本类型是没有序列化的,所以hadoop帮我们将基本类型重新实现了序列化
* hadoop用的序列化框架是avro 比java原生的序列化框架性能更高
* */
public class WordCountMapper extends Mapper <LongWritable, Text, Text, IntWritable> {

    //--没读取一行数据就会调用一次map方法
    //-- key 行首偏移量
    //-- value 一行数据
    //-- context是 mr的上下文对象
   Text keyout=new Text();
   IntWritable valueout=new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("map执行了。。。。。");
        //--value 就是一行数据
        String line = value.toString();
        //--切分 【hello world】
        String[] words = line.split(" ");
        for (String word : words) {
            //--将每个单词进行打标记1输出
            //--hadoop会根据你map输出的key帮我们进行聚合
            keyout.set(word);
            valueout.set(1);
            context.write(keyout,valueout);
        }
    }
}

Driver类代码

创建Job任务,定义输入输出文件目录

package com.hainiu.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/20
 * 怎么通过参数动态传递的方式运行mapreduce
 */
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.out.println(args[0]);
        System.out.println(args[1]);

        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job  job = Job.getInstance(conf, "wc");
        //--定义任务的入口类
        job.setJarByClass(WordCountDriver.class);
        //--定义mapper类
        job.setMapperClass(WordCountMapper.class);
        //--定义mapper类的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //--定义reduce类
        job.setReducerClass(WordCountReduce.class);
        //--定义reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //--处理文件
        FileInputFormat.addInputPath(job,new Path("hdfs://ns1/word/words.txt"));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path(""hdfs://ns1/wcresult""));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

2.6 设置运行参数

设置程序执行参数,输入输出路径不用在代码中写死。

# 输入输出都写hdfs路径

run ---> Edit Configurations. 进入设置参数界面

file

注意: 输入输出目录一定要写指定具体路径。

file

2.6.1 增加自动删除目录方法

由于在mapredue任务执行前需要删除输出目录。否则再次执行MR任务,造成输出目录重复则会报错

file

添加如下方法

file

2.6.2 增加log4j支持

添加log4j所需的jar包

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.21</version>
</dependency>

拷贝下面的log4j.properties文件到新建的resources资源文件夹里

#设定控制台和指定目录日志都按照info级别来打印输出
log4j.rootLogger=info,console,HFILE
#日志输出到指定目录
log4j.appender.HFILE=org.apache.log4j.RollingFileAppender
#输出路径自己设置
log4j.appender.HFILE.File=/opt/hainiu_hadoop_logs/log.log
log4j.appender.HFILE.MaxFileSize=30mb
log4j.appender.HFILE.MaxBackupIndex=20
log4j.appender.HFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.HFILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %l %t %r  %c: %m%n
#日志输出到控制台
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

file

3 Mapreduce 原理

3.1 map任务的输入文件是怎么分割的

为什么要讲这个知识点?

​ mapreduce中生成多少个MapTask是由输入文件的输入分片决定的,有多少输入分片,就会生成多少MapTask。

输入文件分片方法:

1)首先查看输入文件的压缩格式是否支持split,如果不支持,则一个文件对应一个split;

2)如果支持split,会默认按照一个hdfs块大小对应一个split。

3)如果文件剩余块大小/分块大小>1.1,那会生成两个split;

如果文件剩余大小/splitsize<=1.1,剩余的部分作为一个split。

MapReduce 通过 org.apache.hadoop.mapreduce.InputSplit 类提供数据切片方法(抽象类)。

1)文件个数

2)文件是否支持split

file

3)hdfs块大小

4)1.1 的系数

在 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 这个类中有一个神奇的参数 :

private static final double SPLIT_SLOP = 1.1;   // 10% slop

file

3.2 MapReduce灵魂shuffle 过程

一个Reducer的情况

file

多个Reducer的情况(2个为例)

file

有多少个reducer将来就有多少个输出文件,一般reducer的数量和分区的数量是一致的。

如何确定哪些数据写入哪个Partition?

file

假设:reduce数量是2

partitionId = key的hash值% reduce的数量

partitionA = key的hash值尾数是0

partitionB = key的hash值尾数是1

如果reduce数量是3

partitionA = key的hash值尾数是0

partitionB = key的hash值尾数是1

partitionC = key的hash值尾数是2

4 怎么设置输出压缩

file

在shuffle过程中,reducer端 拉去数据并进行merge数据 占整个reducer运行进度的33%,但可能因为map阶段文件分布不均导致该阶段耗费50-70%的时间,怎么减少reducer从map拉取的数据量,我们可以设置压缩。

压缩分为map阶段输出压缩,以及reduce阶段输出压缩

可以在配置文件中进行配置默认压缩

  <property>
        <name>mapreduce.map.output.compress</name> 
        <value>true</value>
        <description>map是否开启输出压缩</description>
    </property>

    <property>
        <name>mapreduce.map.output.compress.codec</name>
        <value>org.apache.hadoop.io.compress.Bzip2Codec</value>
        <description>map输出默认的算法</description>
    </property>
 <property>
        <name>mapreduce.output.fileoutputformat.compress</name> 
        <value>true</value>
        <description>reduce是否开启输出压缩</description>
    </property>

    <property>
        <name>mapreduce.output.fileoutputformat.compress.codec</name>
        <value>org.apache.hadoop.io.compress.Bzip2Codec</value>
        <description>reduce输出默认的算法</description>
    </property>

1)设置reducer压缩

// ---设置reduce输出压缩---
// 1)开启reduce输出压缩
FileOutputFormat.setCompressOutput(job, true);
// 2)设置输出压缩格式--gzip
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

file

2)设置mapper 压缩


        //方式1:
        // 开启map输出压缩
        conf.set(MRJobConfig.MAP_OUTPUT_COMPRESS, "true");
        // 设置输出压缩格式是 Bzip压缩
        conf.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Bzip2Codec.class.getName());
        // 创建运行mapreduce任务的Job对象
        // 当创建job对象时,会把 conf里面的所有数据 拷贝到 job对象的Configuration里
        Job job = Job.getInstance(conf, "wordcount");

        //方式2:
        // 开启map输出压缩
        job.getConfiguration().set(mapreduce.map.output.compress", "true");
        // 设置输出压缩格式是 Bzip压缩
        job.getConfiguration().set("mapreduce.map.output.compress.codec", Bzip2Codec.class.getName());

file

map输出压缩可以通过日志观察压缩情况

file

5 设置map阶段Combiner

file

提前在map节点进行合并计算,这样输出到reducer的数据量就会变少,reducer的计算压力也会减小,

因为combiner是发生在map输出到缓冲区,在缓冲区完成的。要实现的需求是提前进行累加计算,所以和reduce的功能是一样的,需要实现的方法是reduce方法。

Combiner代码:

public class WordCountCombiner  extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    //--key就是map输出之后,hadoop框架聚合之后的key values就是相同的key聚合后的数据
    //--hadoop框架聚合之后,有多少个key就会调用多少次reduce方法
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum=0;
        for (IntWritable value : values) {
            int i = value.get();
            sum+=i;
        }

        //--输出
        context.write(key,new IntWritable(sum));
    }
}

并在job中进行设置

file

增加combiner和不加的区别:

没有提前combiner的:

file

增加combiner的

file

注意:类似于单词统计这样的需求可以提前对单词在map节点进行统计,适合于提前combiner。但是不适用于所有业务

5.1Combiner源码分析:

file

6 计数器

计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。

Hadoop 内置计数器根据功能进行分组。每个组包括若干个不同的计数器,分别是:MapReduce 任务计数器(Map-Reduce Framework)、文件系统计数器(File System Counters)、作业计数器(Job Counters)、输入文件任务计数器(File Input Format Counters)、输出文件计数器(File Output Format Counters)

file

可以配置mapreduce中计数器的数量

 <property>
        <name>mapreduce.job.counters.limit</name>
        <value>20000</value>
        <description>mr允许使用的计数器的最大上限</description>
    </property>

自定义计数器:

需求:统计words.txt中正常单词出现的次数和非正常单词出现的次数

Mapper代码

 public static  class WcMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        //--准备list用于判断文件中的数据是不是敏感单词
        List<String> list=new ArrayList<>();

        Counter mingan=null;
        Counter normal=null;
        @Override
        //--setup会在线程执行的时候,调用一次
        protected void setup(Context context) throws IOException, InterruptedException {
            list.add("fuck");
            list.add("bitch");
            list.add("wanglihong");
            list.add("wuyifan");

            //--获取敏感计数器
             mingan = context.getCounter("wc", "minganCounter");
            //--获取正常单词计数器
             normal=context.getCounter("wc","normalCounter");
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //--wuyifan wanglihong
            String[] words = value.toString().split(" ");
            for (String word : words) {
                if (list.contains(word)){
                    mingan.increment(1);
                    continue;
                }else{
                    normal.increment(1);
                    context.write(new Text(word),new IntWritable(1));
                }
            }
        }
    }

Reducer代码

public static class WcReducer extends Reducer<Text, IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum=0;
            for (IntWritable value : values) {
                sum+=value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

Driver代码

 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf=new Configuration();
        //--map设置压缩方式二:
        conf.set(MRJobConfig.MAP_OUTPUT_COMPRESS, "true");
        conf.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class.getName());

        Job job = Job.getInstance(conf, "wc");
        job.setJarByClass(WcDriver.class);
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReducer.class);
        //--如果map的输出类型和reduce的输出类型一样,则map的输出类型可以省略不写
        job.setOutputKeyClass(Text.class);
        //--设置combiner类
        job.setCombinerClass(WcReducer.class);

        //--方式一:
        // 开启map输出压缩
        //job.getConfiguration().set("mapreduce.map.output.compress", "true");
        //// 设置输出压缩格式是snappy压缩
        //job.getConfiguration().set("mapreduce.map.output.compress.codec", SnappyCodec.class.getName());

        FileSystem fs =FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //--设置reduce端的压缩开启
        FileOutputFormat.setCompressOutput(job ,true);
        //--采用压缩算法
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);

    }

file

单独获取counter,需要在job执行结束之后

        //--获取counter
        Counters counters = job.getCounters();
        //--通过counter组名获取对应的counter
        CounterGroup group = counters.getGroup("wc");
        //--给组中的counter进行遍历
        for (Counter counter : group) {
            System.out.println(counter.getDisplayName()+":"+counter.getValue());
        }
        //--在组中根据名字找对应的counter使用
        Counter normalCounter = group.findCounter("normalCounter");
        System.out.println(normalCounter.getDisplayName()+"----"+normalCounter.getValue());

file

7 join

7.1 join实现

需求:输出订单明细信息--1 zhangsan iphone13 9999 8 beijing

准备数据:

order订单数据
1 zhangsan 001 8 beijing
2 lisi 002 3 hangzhou
3 dajiao 001 5 xiangyashan
4 guangkun 001 6 heilongjiang
5 liuying 002 2 shengzhen
6 dana 001 90 tieling
goods商品数据
001,iphone13 9999
002,xiaomi 1999

file

Mapper代码

 public static class Join1Mapper extends Mapper<LongWritable, Text,Text,Text>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            if (line.contains(",")){
                String bianhao=line.split(",")[0];
                context.write(new Text(bianhao),value);
            }else{
                String bianhao= line.split(" ")[2];
                context.write(new Text(bianhao),value);
            }
        }
    }

Reducer代码

  //--如果输出的时候,不想输出就写NullWritable
    public static class Join2Reducer extends Reducer<Text,Text,Text, NullWritable>{
        @Override
        //-- values[1 zhangsan 001 8 beijing  3 bajie 001 4 shenzhen 4 wukong 001 6 hangzhou 001,ihone13 9999]
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String goodsinfo=null;
            List<String> orderinfos=new ArrayList<>();
            for (Text value : values) {
                if (value.toString().contains(",")){
                    goodsinfo=value.toString();
                }else{
                    orderinfos.add(value.toString());
                }
            }

            for (String orderinfo : orderinfos) {
                String[] goodsdatas = goodsinfo.split(",");
                String allstr = orderinfo.replaceAll(goodsdatas[0], goodsdatas[1]);
                context.write(new Text(allstr),NullWritable.get());
            }

        }
    }

Driver代码

 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "join");
        //--定义任务的入口类
        job.setJarByClass(Join1Driver.class);
        //--定义mapper类
        job.setMapperClass(Join1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //--定义reduce类
        job.setReducerClass(Join1Reducer.class);
        //--定义reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path(args[2]))){
            fs.delete(new Path(args[2]),true);
        }

        //--处理文件
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileInputFormat.addInputPath(job,new Path(args[1]));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path(args[2]));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

如果输入文件格式相同改怎么解决的?

需求:输出每个学生数据分数

准备数据:

user.txt
1 zhangsan 001
2 lisi 002
3 wangwu 003
4 zhaosi 004
5 liuneng 005
6 guangkun 006
score.txt
001 math 100
002 math 98
003 math 94
004 math 95
005 math 90
006 math 85

代码如下

package com.hainiu.join3;

import com.hainiu.join2.Join2Driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/22
 * 演示如果两个文件格式大致相同,最根本的解决方案就是寻找文件的名称,来判断
 */
public class Join2Driver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "join3");
        //--定义任务的入口类
        job.setJarByClass(Join3Driver.class);
        //--定义mapper类
        job.setMapperClass(Join3Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //--定义reduce类
        job.setReducerClass(Join3Reducer.class);
        //--定义reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path(args[2]))){
            fs.delete(new Path(args[2]),true);
        }

        //--处理文件
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileInputFormat.addInputPath(job,new Path(args[1]));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path(args[2]));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

    public static class Join2Mapper extends Mapper<LongWritable, Text,Text,Text>{
        String filename=null;
        //--准备前缀
        String prefix=null;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //--获取文件切片对象
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            Path path = inputSplit.getPath();
            filename = path.getName();
            if (filename.contains("user")){
               prefix="u_";
            }else{
                prefix="m_";
            }

            System.out.println(filename);

        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            if (filename.contains("user")){
                String keyout = line.split(" ")[2];
                String  valueout=prefix+value.toString();
                context.write(new Text(keyout),new Text(valueout));
            }else{
                String keyout = line.split(" ")[0];
                String  valueout=prefix+value.toString();
                context.write(new Text(keyout),new Text(valueout));
            }
        }
    }

    public static class Join2Reducer extends Reducer<Text,Text,Text, NullWritable>{
        @Override
        //--values 【m_001 math 100  s_1 zhangsan 001 】
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //--s_1 zhangsan 001
            String userinfo=null;
                //--m_001 math 100
                String scoreinfo=null;
            for (Text value : values) {
                if (value.toString().startsWith("u_")){
                    userinfo=value.toString();
                }else{
                    scoreinfo=value.toString();
                }
            }
            //--[m 001 math 100]
            String[] scoredatas = scoreinfo.split("[_| ]");
            //-- [s 1 zhangsan 001]
            String[] userdatas = userinfo.split("[_| ]");
            String allstr=userdatas[1]+" "+userdatas[2]+" "+scoredatas[2]+" "+scoredatas[3];
            context.write(new Text(allstr),NullWritable.get());
        }
    }
}

7.2 map join

file

我们都知道mapreduce是基于磁盘的,这样产生大量磁盘IO,所以性能低,处理时间长。

mapjoin:不需要进行shuffle流程,也不需要reduce处理

适用于大表join小表,使用DistributedCache机制将小表存储到各个Mapper进程所在机器的磁盘空间上,各个Mapper进程读取不同的大表分片,将分片中的每一条记录与小表中所有记录进行合并
合并后直接输出map结果即可得到最终结果。

package com.hainiu.join4;

import com.hainiu.join3.Join3Driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

/**
 * @auther chenzhe
 * @date 2022/10/22
 * //--本案例没有redcue,目的是为了减少,频繁落磁盘,从磁盘中拉去数据,由于磁盘的io导致mr计算的性能降低
 //--我们在map阶段进行join的时候,只读取大文件,小文件不会此次盘中读取,为了减少磁盘io
 //--小文件在程序运行之前就已经缓存到了map中,大文件和小文件进行join的时候呀,我们可以直接从内存的map中获取数据进行join
 */
public class Join3Driver {
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "join4");
        //--定义任务的入口类
        job.setJarByClass(Join3Driver.class);
        //--定义mapper类
        job.setMapperClass(Join3大城市现在Z从从从从从从从从从Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //--不要reduce要将reducetask的数量设置为0
        job.setNumReduceTasks(0);

        //--将小文件缓存到内存
        job.addCacheFile(new URI("file:///D:/Personal/Desktop/goods.txt"));

        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }

        //--处理文件
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

    public static class Join3Mapper extends Mapper<LongWritable,Text,Text,NullWritable> {

        Map<String,String> map=new HashMap<>();

        @Override
        //--将小文件缓存内存,缓存到map中
        protected void setup(Context context) throws IOException, InterruptedException {
            //--获取缓存的文件
            URI[] cacheFiles = context.getCacheFiles();

            //-- xxxx/xxxx/xxxx/goods.txt
            String path = cacheFiles[0].getPath();
            System.out.println(path);
            //--获取goods.txt
            String name = path.substring(path.lastIndexOf("/") + 1);
            //--准备输入流开始读取数据
            FileInputStream in=new FileInputStream(name);
            //--将字节输入流转化成字符流
            BufferedReader bf=new BufferedReader(new InputStreamReader(in));
            //--准备一个变量用来保存读取到的小文件中的数据
            String line=null;
            while ((line=bf.readLine())!=null){
                String[] split = line.split(",");
               //--map {001,iphone13 9999  002 xiaomi 1999}
                map.put(split[0],split[1]);
            }

            bf.close();
            in.close();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //--订单的一行数据
            //--1 zhangsan 001 8 beijing
            String line = value.toString();
            String[] datas = line.split(" ");
            String id = datas[2];
            String nameAndPrice = map.get(id);
            //nameAndPrice= nameAndPrice==null?"杂牌子":nameAndPrice;
            //String allstr=datas[0]+" "+datas[1]+" "+nameAndPrice+" "+datas[3]+" "+datas[4];
            //context.write(new Text(allstr),NullWritable.get());

            if (nameAndPrice==null){
                //--semijoin 半球连接
                nameAndPrice= "杂牌子";
                String allstr=datas[0]+" "+ datas[1]+" "+nameAndPrice+" "+datas[3]+" "+datas[4];
                context.write(new Text(allstr),NullWritable.get());
            }else{
                return;
            }
        }
    }

}

8 自定义序列化类

  1. 在Hadoop的集群工作过程中,一般是利用RPC来进行集群节点之间的通信和消息的传输,所以要求MapReduce处理的对象必须可以进行序列化/反序列操作。

  2. Hadoop并没有使用Java原生的序列化,而是利用的是Avro实现的序列化和反序列,并且在其基础上进行了更好的封装,提供了便捷的API

  3. 在Hadoop中要求被序列化的对象对应的类必须实现Writable接口

  4. 序列化过程中要求属性值不能为null

需求:统计flow.txt 中每个人花费的总流量。

#数据如下
手机号       地址   姓名 花费的流量
13877779999 beijing xiaozhan 2145
13766668888 shanghai wangyibo 1028
13766668888 shanghai wangyibo 9987
13877779999 beijing xiaozhan 5678
13544445555 shengzhen luhan 10577
13877779999 shanghai xiaozhan 2145
13766668888 shanghai wangyibo 9987  
13544445555 beijing luhan 3345 
13544445555 beijing luhan 4456
13877779999 shengzhen xiaozhan 4444
13766668888 shengzhen wangyibo 1888
13766668888 shengzhen wangyibo 9987

    package cn.tedu.flow;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    //--hadoop要求传输的javabean对象必须要经过序列化
    //--hadoop针对序列化已经做好了封装底层用的是avro
    public class Flow implements Writable{
        private String phone="";
        private String address="";
        private String name="";
        private int flow;
        //--添加构造方法
        public Flow() {
            super();
        }
        public Flow(String phone, String address, String name, int flow) {
            super();
            this.phone = phone;
            this.address = address;
            this.name = name;
            this.flow = flow;
        }

        //--添加get和set方法 alt+shift+s

        public String getPhone() {
            return phone;
        }

        public void setPhone(String phone) {
            this.phone = phone;
        }
        public String getAddress() {
            return address;
        }
        public void setAddress(String address) {
            this.address = address;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public int getFlow() {
            return flow;
        }
        public void setFlow(int flow) {
            this.flow = flow;
        }
        //--添加toString方法
        @Override
        public String toString() {
            return "Flow [phone=" + phone + ", address=" + address + ", name=" + name + ", flow=" + flow + "]";
        }

        @Override
        //--反序列化
        public void readFields(DataInput in) throws IOException {
            this.phone=in.readUTF();
            this.address=in.readUTF();
            this.name=in.readUTF();
            this.flow=in.readInt();

        }
        @Override
        //--序列化
        public void write(DataOutput out) throws IOException {
            out.writeUTF(phone);
            out.writeUTF(address);
            out.writeUTF(name);
            out.writeInt(flow);

        }

    }
    package cn.tedu.flow;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {

        public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
            //--拿到行数据切分后封装成javabean对象13877779999 bj zs 2145
            String[] datas = ivalue.toString().split(" ");
            Flow f=new Flow();
            f.setPhone(datas[0]);
            f.setAddress(datas[1]);
            f.setName(datas[2]);
            f.setFlow(Integer.parseInt(datas[3]));
            context.write(new Text(datas[0]), f);
        }

    }

    package cn.tedu.flow;

    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class Flowreducer extends Reducer<Text, Flow, Text, IntWritable> {

        public void reduce(Text _key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
            //--定义变量统计总流量
            int sum=0;
            Flow f = null;
            for (Flow val : values) {
                sum+=val.getFlow();
                f=val;
            }
            context.write(new Text(f.getName()), new IntWritable(sum));
        }

    }
    package cn.tedu.flow;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class FlowDriver {

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "JobName");
            job.setJarByClass(cn.tedu.flow.FlowDriver.class);
            // TODO: specify a mapper
            job.setMapperClass(FlowMapper.class);
            // TODO: specify a reducer
            job.setReducerClass(Flowreducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Flow.class);
            // TODO: specify output types
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //--设置分区规则
            job.setPartitionerClass(AddressPartitioner.class);
            //--设置reducetask的数量 规则:分区的数量等于reducetask的数量
            job.setNumReduceTasks(3);
            // TODO: specify input and output DIRECTORIES (not files)
            FileInputFormat.setInputPaths(job, new Path("hdfs://10.5.24.244:9000/txt/flow.txt"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://10.5.24.244:9000/flowcount"));

            if (!job.waitForCompletion(true))
                return;
        }

    }

9 自定义分区

file

  1. 分区操作是shuffle操作中的一个重要过程,作用就是将map的结果按照规则分发到不同reduce中进行处理,从而按照分区得到多个输出结果。
  2. Partitioner是分区的基类,如果需要定制partitioner也需要继承该类
  3. HashPartitioner是MapReduce的默认partitioner。计算方法是:which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
  4. 默认情况下,reduceTask数量为1
  5. 很多时候MapReduce自带的分区规则并不能满足业务需求,为了实现特定的效果,可以需要自己来定义分区规则
  6. 如果定义了几个分区,则需要定义对应数量的ReduceTask

需求:改造flow案例,按照不同的城市统计每个人花费的总流量

public class FlowPartitioner extends Partitioner<Text, Flow> {

    // 用于进行分区的方法
    @Override
    public int getPartition(Text key, Flow value, int numReduceTasks) {

        String addr = value.getAddr();

        if (addr.equals("bj"))
            return 0;
        else if (addr.equals("sh"))
            return 1;
        else
            return 2;

    }

}
// 在任务调度代码中,增加Partitioner配置
//设置Partitioner类
job.setPartitionerClass(DCPartitioner.class);
//指定Reducer的数量,此处通过main方法参数获取,方便测试
job.setNumReduceTasks(3);

10 排序

  1. Map执行过后,在数据进入reduce操作之前,数据将会按照输出的Key进行排序,利用这个特性可以实现大数据场景下排序的需求
  2. 要排序的对象对应类实现WritableComparable接口,根据返回值的正负决定排序顺序
#测试数据
zhangsan 88
lisi 56
wangwu 73
zhaoliu 98
mayun 9
bajie 78
wukong 96
package com.hainiu.sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/22
 */
public class Student  implements WritableComparable<Student> {
    private String name="";
    private int score;

    @Override
    public String toString() {
        return "name: "+name+"-----"+"score: "+score;
    }

    public Student() {
    }

    public Student(String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }

    @Override
    //--用来对对象中的属性进行比较操作的,按照什么排序就可以重写compareTo方法即可
    public int compareTo(Student o) {

        //--this在前升序,this在后降序

        //return o.score-this.score;
        return this.score-o.score;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(score);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.name=in.readUTF();
        this.score=in.readInt();
    }
}

package com.hainiu.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/22
 */
public class SortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "sort");
        //--定义任务的入口类
        job.setJarByClass(SortDriver.class);
        //--定义mapper类
        job.setMapperClass(SortMapper.class);
        job.setMapOutputKeyClass(Student.class);
        job.setMapOutputValueClass(NullWritable.class);

        //job.setNumReduceTasks(0);

        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(Student.class);
        job.setOutputValueClass(NullWritable.class);
        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path("E://sortresult"))){
            fs.delete(new Path("E://sortresult"),true);
        }

        //--设置reducetask的数量等于分区的数量
        //--reducetask的数量可以大于分区的数量,不能小于分区的数量

        //--处理文件
        FileInputFormat.addInputPath(job,new Path("D:\\Personal\\Desktop\\考试\\txt\\student.txt"));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path("E://sortresult"));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

    public static class SortMapper extends Mapper<LongWritable, Text,Student, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] datas = value.toString().split(" ");
            Student s=new Student();
            s.setName(datas[0]);
            s.setScore(Integer.parseInt(datas[1]));
            context.write(s,NullWritable.get());
        }
    }
    public static class SortReducer extends Reducer<Student, NullWritable,Student, NullWritable>{
        @Override
        protected void reduce(Student key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            super.reduce(key, values, context);
        }
    }
}

11 二次排序

需求:对num文件中的数据进行二次排序,先按第一列数值进行升序,如果第一列数值一样的话,按照第二列数值进行降序排序

#测试数据
22 67
45 76
44 67
22 89
33 12
22 67
36 77
45 9
9 25
88 34
package com.hainiu.num;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/22
 */
public class Num implements WritableComparable<Num> {
    private int num1;
    private int num2;

    public Num() {
    }

    public Num(int num1, int num2) {
        this.num1 = num1;
        this.num2 = num2;
    }

    @Override
    public String toString() {
        return "num1 "+num1+"  num2  "+num2;
    }

    public int getNum1() {
        return num1;
    }

    public void setNum1(int num1) {
        this.num1 = num1;
    }

    public int getNum2() {
        return num2;
    }

    public void setNum2(int num2) {
        this.num2 = num2;
    }

    @Override
    public int compareTo(Num o) {
        int i = o.num1 - this.num1;
        if (i==0){
            return this.num2-o.num2;
        }
        return i;
    }

    @Override
    public void write(DataOutput out) throws IOException {
            out.writeInt(num1);
            out.writeInt(num2);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.num1=in.readInt();
        this.num2=in.readInt();
    }
}

package com.hainiu.num;

import com.hainiu.sort.SortDriver;
import com.hainiu.sort.Student;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/22
 * 演示二次排序
 */
public class NumDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "flow");
        //--定义任务的入口类
        job.setJarByClass(NumDriver.class);
        //--定义mapper类
        job.setMapperClass(NumMapper.class);
        job.setMapOutputKeyClass(Num.class);
        job.setMapOutputValueClass(NullWritable.class);

        //job.setNumReduceTasks(0);

        //job.setReducerClass(SortDriver.SortReducer.class);
        //job.setOutputKeyClass(Student.class);
        //job.setOutputValueClass(NullWritable.class);
        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path("E://numresult"))){
            fs.delete(new Path("E://numresult"),true);
        }

        //--设置reducetask的数量等于分区的数量
        //--reducetask的数量可以大于分区的数量,不能小于分区的数量

        //--处理文件
        FileInputFormat.addInputPath(job,new Path("D:\\Personal\\Desktop\\考试\\txt\\num.txt"));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path("E://numresult"));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

    public static class NumMapper extends Mapper<LongWritable, Text,Num, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] datas = value.toString().split(" ");
            Num num=new Num(Integer.parseInt(datas[0]),Integer.parseInt(datas[1]));
            context.write(num,NullWritable.get());
        }
    }
}

12 自定义Inputformat

#测试数据
mayun
math 90
english 98
leijun
math 78
english 87
jianlin
math 87
english 90
huateng
math 67
english 87
yiming
math 59
english 80
masike
math 79
english 60

file

1. MapReduce开始阶段阶段,InputFormat类用来产生InputSplit,并把基于RecordReader它切分成record(即KEYIN-VALUEIN),形成Mapper的输入。
2. Hadoop本身提供了若干内置的InputFormat,其中如果不明确指定默认使用TextInputFormat
3. InputFormat中主要定义了如下两个方法:getSplits以及createRecordReader 如果数据来源是文件,那么可以继承FileInputFormat。FileInputFormat实现了InputFormat接口,实现了getSplits方法,根据配置去逻辑切割文件,返回FileSplit的集合,并提供了isSplitable()方法,子类可以通过在这个方法中返回boolean类型的值表明是否要对文件进行逻辑切割,如果返回false则无论文件是否超过一个Block大小都不会进行切割,而将这个文件作为一个逻辑块返回。而对createRecordReader方法则没有提供实现,设置为了抽象方法,要求子类实现。

假设输入文件不是一行数据处理一次,那么默认的读取文件的方式就不能使用了,需要自定义Inputformat

file

需求:统计score3中每个学生的总成绩


// 格式类
public class AutoInputFormat extends FileInputFormat<Text, Text>{

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        return new AutoRecordReader();
    }

}

// 读取类
public class AutoRecordReader extends RecordReader<Text, Text> {
    private LineReader reader;
    private Text key;
    private Text value;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        // 获取文件分片
        FileSplit fs = (FileSplit) split;
        // 通过分片获取文件路径
        Path path = fs.getPath();
        // 获取环境配置
        Configuration conf = context.getConfiguration();
        // 获取文件系统
        FileSystem system = FileSystem.get(URI.create(path.toString()), conf);
        // 获取输入流
        FSDataInputStream in = system.open(path);
        // 将该字节流包装成一个LineReader对象方便按行读取
        reader = new LineReader(in);

    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        // 初始化键值对
        key = new Text();
        value = new Text();

        Text temp = new Text();
        if (reader.readLine(temp) == 0)
            return false;
        key.set(temp.getBytes());
        for (int i = 0; i < 2; i++) {
            if (reader.readLine(temp) == 0)
                return false;
            value.append(temp.getBytes(), 0, temp.getLength());
            value.append(new Text(" ").getBytes(), 0, 1);
        }

        return true;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        if (reader != null)
            reader.close();
    }

}
// Mapper
public class AutoMapper extends Mapper<Text, Text, Text, Text> {

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        context.write(key, value);

    }

}
// Driver
public class AutoDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "JobName");
        job.setJarByClass(cn.tedu.autoinput.AutoDriver.class);
        job.setMapperClass(AutoMapper.class);

        job.setInputFormatClass(AutoInputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.32.147:9000/txt/score3.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.32.147:9000/result"));

        if (!job.waitForCompletion(true))
            return;

    }

}

13 任务打包上集群运行

1) 增加打包插件

maven的assembly的打包方法在项目中的pom文件中增加assembly插件maven-assembly-plugin 就是用来帮助打包用的,比如说打出一个什么类型的包,包里包括哪些内容等等

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hainiuxy</groupId>
  <artifactId>hainiumr</artifactId>
  <version>1.0</version>
  <packaging>jar</packaging>
  <name>hainiu_mr</name>
  <url>http://maven.apache.org</url>
    <properties>
        <!-- 项目编码 -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- 编译及输出的时候应用那个版本的jdk -->
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- 【修改点】运行的Driver类,是打包运行的主类-->
     <!--<mainClass>com.hainiu.hadoop.mapreduce.mrrun.Driver</mainClass>-->
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
            <!--已提供范围的依赖在编译classpath (不是运行时)可用。它们不是传递性的,也不会被打包 -->
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>

                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>

                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>

                    <descriptors>
                        <descriptor>src/main/resources/assembly.xml</descriptor>
                    </descriptors>
                    <archive>
                        <manifest>

                            <mainClass>${mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>

                    <execution>
                        <id>make-assembly</id>

                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12</version>
                <configuration>
                    <skip>true</skip>
                    <forkMode>once</forkMode>
                    <excludes>
                        <exclude>**/**</exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

把assembly配置文件放到/src/main/resources目录中,如果没有则创建一个,在assembly.xml中配置好打包时需要包含的文件,和需要排除的文件

<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"   
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">  
  <!-- TODO: a jarjar format would be better -->  
  <!-- 添加到生成文件名称的后缀符 -->
  <id>hainiu</id>  
  <!-- 打包类型 -->
  <formats>  
    <format>jar</format>
  </formats>
  <!--  指定是否包含打包层目录 -->
  <includeBaseDirectory>false</includeBaseDirectory>
  <!-- 指定要包含的文件集 -->
  <fileSets>
    <fileSet>
         <!-- 指定目录 -->
         <directory>${project.build.directory}/classes</directory>
         <!-- 指定文件集合的输出目录,该目录是相对于根目录 -->
         <outputDirectory>/</outputDirectory>
         <!-- 排除文件 -->
         <excludes>
            <exclude>*.xml</exclude>
            <exclude>*.properties</exclude>
         </excludes>
     </fileSet>
  </fileSets>
  <!-- 用来定制工程依赖 jar 包的打包方式 -->
  <dependencySets>
    <dependencySet>
      <!-- 指定包依赖目录,该目录是相对于根目录 -->
      <outputDirectory>/</outputDirectory>  
      <!-- 当前项目构件是否包含在这个依赖集合里 -->
      <useProjectArtifact>false</useProjectArtifact>
      <!-- 是否将第三方jar包解压到该依赖中 false 直接引入jar包 true解压引入 -->
      <unpack>true</unpack>
      <!-- 将scope为runtime的依赖包打包到lib目录下。 -->
      <scope>runtime</scope>  
    </dependencySet>    
  </dependencySets> 
</assembly>

提交任务到集群运行

hadoop jar hainiu-hainiu.jar com.hainiu.wordcount.WordCountDriver /wcdata/words /wcresult

file

需要在提交任务的时候指定队列:

hadoop jar hainiu-hainiu.jar com.hainiu.wordcount.WordCountDriver -Dmapreduce.job.queuename=hainiu  /wcdata/words /wcresult

file

相当于:

file

file

怎么解决呢?需要通过ToolRunner提交任务才可以解析-D 参数

代码如下

package com.hainiu.wc;

import com.hainiu.wordcount.WordCountDriver;
import com.hainiu.wordcount.WordCountMapper;
import com.hainiu.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class WcDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        System.out.println(args[0]);
        System.out.println(args[1]);
        System.out.println(args[2]);
        System.exit(ToolRunner.run(new WcDriver(),args));
    }
    public int run(String[] strings) throws Exception {
        System.out.println(strings[0]);
        System.out.println(strings[1]);
        Configuration conf=getConf();
        //--创建job任务对象
        Job job = Job.getInstance(conf, "wordcount");
        //--设置入口类
        job.setJarByClass(WcDriver.class);
        //--定义mapper类
        job.setMapperClass(WcMapper.class);
        //--定义reducer类
        job.setReducerClass(WcReducer.class);
        //--定义mapper的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //--定义reducer的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
      //--设置reducetask的数量
        job.setNumReduceTasks(2);
        //--在运行之前,先将输出路径删除
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(new Path(strings[1]))){
            fs.delete(new Path(strings[1]),true);
        }

        //--定义mapreduce要处理的hdfs文件的路径
        FileInputFormat.addInputPath(job,new Path(strings[0]));

        //--定义mapreduce处理完结果的输出路径
        FileOutputFormat.setOutputPath(job,new Path(strings[1]));

        //--启动job
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static class WcMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        Text keyout=new Text();
        IntWritable valueout=new IntWritable();
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            System.out.println("map方法执行了");
            //--value --》hello word
            String line = value.toString();
            //--进行切分[hello world]
            String[] words = line.split(" ");
            for (String word : words) {
                    valueout.set(1);
                    context.write(keyout,valueout);
                }

            }
        }
        public static class WcReducer extends Reducer<Text, IntWritable,Text, IntWritable>{
            IntWritable valueout=new IntWritable();
            @Override
            //--每输入一个k,v就会调用一次
            protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                System.out.println("reduce 方法执行了");
                //--定义变量记录每个单词的总次数
                int sum=0;
                for (IntWritable value : values) {
                    int count = value.get();
                    sum+=count;
                }
                valueout.set(sum);
                context.write(key,valueout);

            }
        }
    }

2)短命令提交任务运行

file

找到案例包中运行的主类

file

发现主类中wordcount别名对应的是wordcount class类

file

参数传到了main方法

file

按照官方案例,我们也可以这么干!!!

添加要运行的主类

file

编写主类代码定义别名

package com.hainiu.Driver;

import com.hainiu.join.Join1Driver;
import com.hainiu.wordcount.WordCountDriver;
import org.apache.hadoop.util.ProgramDriver;

/**
 * @auther chenzhe
 * @date 2022/10/22
 */
public class RunDriver {
    public RunDriver() {
    }

    public static void main(String[] argv) {
        int exitCode = -1;
        ProgramDriver pgd = new ProgramDriver();

        try {
            pgd.addClass("wc", WordCountDriver.class , "这是一个单词统计的案例.");
            exitCode = pgd.run(argv);
        } catch (Throwable var4) {
            var4.printStackTrace();
        }

        System.exit(exitCode);
    }
}

重新打包上集群通过别名运行

hadoop jar hainiumr-1.0-hainiu.jar wordcount -Dmapreduce.job.queuename=hainiu   /word/words.txt /wcresult

14 mapreduce优化

14.1 推测执行

file

Straggle(掉队者)是指那些跑的很慢但最终会成功完成的任务。一个掉队的Map任务会阻止Reduce任务开始执行。

​ Hadoop不能自动纠正掉队任务,但是可以识别那些跑的比较慢的任务,然后它会产生另一个等效的任务作为备份,并使用首先完成的那个任务的结果,此时另外一个任务则会被要求停止执行。这种技术称为推测执行(speculative execution)。

​ 默认是开启推测执行

    <property>
        <name>mapreduce.map.speculative</name>
        <value>true</value>
        <description>是否对Map Task启用推测执行机制</description>
    </property>

    <property>
        <name>mapreduce.reduce.speculative</name>
        <value>true</value>
        <description>是否对Reduce Task启用推测执行机制</description>
    </property>

​ 有的时候要把 推测执行 关掉,这个一般是在代码中设置。

什么时候不合适开启推测执行呢?

​ 如果你的输出是往MySQL数据库输出的,那么这个时候我们必须要关闭推测执行,因为推测执行是两个任务一起跑如果谁跑完了就kill掉对方,这样的话对我向数据库插入这种任务来说就会造成数据重复插入的问题,所以不能开启推测执行。

14.2 合理设置HDFS文件块的大小

块的大小和文件的数量决定了map任务的数量,根据服务器读取数据的速度进行数据块大小设置 一般常见为128M 或者256M。

14.3 增加map buff缓冲区的大小

file

mapreduce.task.io.sort.mb 100 shuffle 的环形缓冲区大小,默认 100m
mapreduce.map.sort.spill.percent 0.8 环形缓冲区溢出的阈值,默认 80%
   <property>
        <name>mapreduce.task.io.sort.mb</name>
        <value>128</value>
        <description>Map Task缓冲区所占内存大小</description>
    </property>

14.4 数据倾斜

key设计不均衡的表现, reduce2执行完了,reduce1还在拉取数据,并且reduce2得等着reduce1完成,整个任务才算完成。

reduce1: 90w 个 a

reduce2: 10w 个 b

file

如何来解决key不均衡?

90w a 如果key相同的话都会进入一个 reduce 造成reduce压力过大,任务处理时间拉长

所以我们要想办法将key 重新设计

比如key 都是a 我们就可以采用后面加分隔符 加 随机数或者队列的方式进行操作

随机数

a_0.234345364356 --> 默认情况 parititoner --> hashpartitoner

a_0.344354354353

序列

a_1

a_2

a_3

由于key的重新设计 使我们以前分部在一起的数据 现在能够平均分部在多个reduce理, 虽然这样破坏了统计的key 但是做到了 化大为小的原则 假如你再想进行计算 可以在已经分好的数据中再次执行mapreduce,也就是需要多个mapreduce才能将结果计算出来

热点key负载均衡代码:

package com.hainiu.balancewc;

import com.hainiu.wordcount.WordCountDriver;
import com.hainiu.wordcount.WordCountMapper;
import com.hainiu.wordcount.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/21
 */
public class WcBalance1Driver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "wcbalance");
        //--定义任务的入口类
        job.setJarByClass(WcBalance1Driver.class);
        //--定义mapper类
        job.setMapperClass(WcBalanceMapper.class);

        //--设置reducetask的数量
        job.setNumReduceTasks(2);

        //--定义reduce类
        job.setReducerClass(WcBalanceReducer.class);
        //--定义reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }

        //--处理文件
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

   public static class WcBalanceMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
       @Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           String[] words = value.toString().split(" ");
           int seqNUm=1;
           for (String word : words) {
                //--来一个固定的序列
               context.write(new Text(word+"_"+seqNUm++),new IntWritable(1));
               if (seqNUm==10){
                   seqNUm=1;
               }
           }
       }

   }
    public static class WcBalanceReducer extends Reducer<Text, IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum=0;
            for (IntWritable value : values) {
                sum+=value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

}

针对上一次统计完的结果进行二次统计

package com.hainiu.balancewc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @auther chenzhe
 * @date 2022/10/21
 */
public class WcBalance2Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //--创建hadoop的配置对象
        Configuration conf=new Configuration();
        //--定义一个job用来启动一个任务
        Job job = Job.getInstance(conf, "wcbalance");
        //--定义任务的入口类
        job.setJarByClass(WcBalance2Driver.class);
        //--定义mapper类
        job.setMapperClass(WcBalance2Mapper.class);

        //--定义reduce类
        job.setReducerClass(WcBalance2Reducer.class);
        //--定义reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //--判断输出目录是否存在,如果存在就删除
        //--获取hdfs文件系统对象--面向对象
        FileSystem fs=FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }

        //--处理文件
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //--定义结果目录
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //--启动job运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

    public static  class WcBalance2Mapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        @Override
        //--bitch_2 7[bitch 2 7]
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] datas = value.toString().split("[_|\\t]");
            int count=Integer.parseInt(datas[2]);
            context.write(new Text(datas[0]),new IntWritable(count));

        }
    }

    public static class WcBalance2Reducer extends Reducer<Text, IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum=0;
            for (IntWritable value : values) {
                sum+=value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
}

14.5 增加reduce的个数

file

如何设置 reducer 个数?

reducer的个数决定最终输出文件的个数,可以通过以下方式设定

1)job.setNumReduceTasks(2);

2)conf.set()

3)通过在mapred-site.xml 配置参数

4)通过 -Dmapreduce.job.reduces 参数设定

如果四种方式配置冲突了,哪个配置会生效?

如果-D参数和mapred-site.xml 冲突了,-D参数会生效。

如果-D参数和conf.set() 或 job api 设置冲突了, 后两者会生效。

如果conf.set() 和 job api 设置冲突了,哪个后设置,哪个会生效。

记住结论。

14.6 增加或减少reduce copy buff缓冲区的大小,增加copy线程的线程数

file

<!--调整线程数--> 
<property>
        <name>mapreduce.reduce.shuffle.parallelcopies</name>
        <value>24</value>
        <description>作为client端的reduce同时从map端拉取数据的并行度(一次同时从多少个map拉数据),
        每个reduce并行下载map结果的最大线程数</description>
 </property>
    <property>
        <name>mapreduce.tasktracker.http.threads</name>
        <value>40</value>
        <description>作为server端的map用于提供数据传输服务的线程数</description>
    </property>
<!--调整缓冲区大小--> 
<property>
        <name>mapreduce.reduce.shuffle.input.buffer.percent</name>
        <value>0.7</value>
 </property>
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-薪牛,http://hainiubl.com/topics/75967
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter