1.flume

教程 阿布都的都 ⋅ 于 2023-01-07 14:35:26 ⋅ 1074 阅读

1 flume概述

​ Flume是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据,同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力,Flume 在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG,0.9.x的称为Flume OG。

​ flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到HDFS、Hbase,简单来说flume就是收集日志的。 flume 作为 cloudera(CDH版本的hadoop) 开发的实时日志收集系统,受到了业界的认可与广泛应用。

Flume两个版本区别:

1)Flume-og

​ Flume 0.9X版本的统称。

​ Flume-og采用了多Master的方式,为了保证配置数据的一致性,Flume引入了ZooKeeper用于保存配置文件,在配置文件发生变化时,ZooKeeper可以通知Flume Master节点,Flume Master间可同步数据。

2)Flume-ng

​ Flume1.X版本的统称。

​ Flume-ng 取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。

1.1 flume的结构模型

​ Flume 运行的核心是 Agent,Flume以agent为最小的独立运行单位,一个agent就是一个JVM,含有三个核心组件,分别是source、 channel、 sink,通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。

file

Source:

​ 从Client上收集数据并对数据进行格式化,以Event(事件)的形式传递给单个或多个Channel。

Channel:

​ 短暂的存储容器,将从Source接收到的Event进行缓存直到被Sink消费掉,Channel是Source和Sink之间的桥梁,Channal是一个完整的事务,能保证了数据在收发时的一致性。

Sink:

​ 从Channel中消费数据(Event)并传递到存储容器(Hbase、HDFS)或其他的Source中。

工作流程:

​ 把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。

​ 为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。

什么是Event?

​ 1)event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录。

​ 2)event也是事务的基本单位。

​ 3)event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。

​ 4)event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

file

Agent:

​ Flume以 Agent 为最小的独立运行单元,Agent 依赖于 JVM ,一个 Agent 的运行就伴随一个 JVM 实例的产生。

​ 一台机器可以运行多个Agent,一个Agent中可以包含多个Source、Sink。

1.2 flume各组件介绍

​ Flume提供了大量内置的Source、Channel和Sink类型,不同类型的Source,Channel和Sink可以自由组合.组合方式基于用户设置的配置文件。

1.2.1 Source组件

​ Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中,Flume提供了各种source的实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果内置的Source无法满足需要, Flume还支持自定义Source。

file

file

1.2.2 Channel组件

​ Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件,Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。

​ MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。

​ MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。

​ FileChannel保证数据的完整性与一致性。

file

1.2.3 Sink组件

​ Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

​ Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据,在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

file

2 flume 安装

集群 → 添加服务, 选择flume

file

选择依赖的服务

file

添加主机

file

启动服务即可使用。

3 flume应用

flume应用就是学组价的搭配应用,根据各组件的不同,配置内容也不同

可参考官方网站:http://flume.apache.org/FlumeUserGuide.html

3.1 netcat -- memory -- logger

监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。

其中:

Source:netcat

Sink:logger

Channel:memory

file

1)从整体上描述代理agent中sources、sinks、channels所涉及到的组件

# 配置Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

2)分别配置三个组件的具体实现

# 配置Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
# 配置Sink
a1.sinks.k1.type = logger
# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

3)将三个组件进行连接

# 将三者连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4)启动flume agent a1 服务端

# 每个人用自己的,注意给自己用户权限 /data/xxx/flume
flume-ng  agent -n a1 -f /data/xinniu/flume/conf/netcat-conf.properties   -Dflume.root.logger=DEBUG,console

参数说明:

-n :指定agent名称(与配置文件中代理的名字相同)
-c :指定flume中配置文件的目录
-f :指定配置文件
-Dflume.root.logger=DEBUG,console :设置日志等级

5)使用telnet发送数据

在虚拟机里发送命令

telnet  192.168.142.160  22222
aa bb cc

6)在控制台上查看flume收集到的日志数据

file

3.2 exec -- memory -- hdfs

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 ;

常用的是tail -F file指令监控一个文件,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。

file

可用此方式进行实时抽取。

配置如下:

Source:exec

Sink:hdfs

Channel:memory

# 配置Agent
a1.sources = f1
a1.channels = c1
a1.sinks = t1

# 配置Source
a1.sources.f1.type = exec
# 配置需要监控的日志输出目录
a1.sources.f1.command = tail -f /data/xinniu/flume/exec/exec.log

# 配置Sink
# 指定目标类型
a1.sinks.t1.type = hdfs
# 使用linux的本地时间
a1.sinks.t1.hdfs.useLocalTimeStamp = true
# 设定HDFS的目录结构
a1.sinks.t1.hdfs.path = hdfs:///user/xinniu/flume-exec/%Y-%m-%d
# 文件的前缀
a1.sinks.t1.hdfs.filePrefix = %Y-%m-%d-%H
# 文件的后缀
a1.sinks.t1.hdfs.fileSuffix = .log
# 设置副本为1
a1.sinks.t1.hdfs.minBlockReplicas = 1
# 不压缩的文本
a1.sinks.t1.hdfs.fileType = DataStream
# 文本格式
a1.sinks.t1.hdfs.writeFormat = Text
# 可以设置一个时间来关闭一个hdfs文件 
a1.sinks.t1.hdfs.rollInterval = 0
# 可以设置一个字节数来关闭一个文件
a1.sinks.t1.hdfs.rollSize = 0
# 设置当写入了多少个event的时候就关闭这个文件
a1.sinks.t1.hdfs.rollCount = 5
# 禁止因为超时原因而关闭文件
a1.sinks.t1.hdfs.idleTimeout = 0

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将三者连接
a1.sources.f1.channels = c1
a1.sinks.t1.channel = c1

其中:

sink 输出到hdfs中,每5个event 生成一个hdfs文件,hdfs文件目录会根据hdfs.path 的配置自动创建。

sink hdfs 配置参数描述:

名称 描述
hdfs.path hdfs目录路径
hdfs.filePrefix 文件前缀。默认值FlumeData
hdfs.fileSuffix 文件后缀
hdfs.rollInterval 多久时间后close hdfs文件。单位是秒,默认30秒。设置为0的话表示不根据时间close hdfs文件
hdfs.rollSize 文件大小超过一定值后,close文件。默认值1024,单位是字节。设置为0的话表示不基于文件大小
hdfs.rollCount 写入了多少个事件后close文件。默认值是10个。设置为0的话表示不基于事件个数
hdfs.fileType 文件格式, 有3种格式可选择:SequenceFile(默认), DataStream(不压缩) or CompressedStream(可压缩)
hdfs.batchSize 批次数,HDFS Sink每次从Channel中拿的事件个数。默认值100
hdfs.minBlockReplicas HDFS每个块最小的replicas数字,不设置的话会取hadoop中的配置
hdfs.maxOpenFiles 允许最多打开的文件数,默认是5000。如果超过了这个值,越早的文件会被关闭
serializer HDFS Sink写文件的时候会进行序列化操作。会调用对应的Serializer接口,可以自定义符合需求的Serializer
hdfs.retryInterval 关闭HDFS文件失败后重新尝试关闭的延迟数,单位是秒
hdfs.callTimeout HDFS操作允许的时间,比如hdfs文件的open,write,flush,close操作。单位是毫秒,默认值是10000
hdfs.writeFormat 序列文件记录的格式。一个文本或writable
hdfs.codeC 压缩编解码器。以下之一:gzip,bzip2,lzo,lzop,snappy
hdfs.idleTimeout 超时之后,非活动文件关闭(0 =禁止自动关闭空闲文件)

创建监控文件


mkdir /data/xinniu/flume/exec && cd /data/xinniu/flume/exec && touch exec.log && echo 'OK'

启动flume agent a1 服务端

flume-ng agent -n a1 -f /data/xinniu/flume/conf/exec-conf.properties  -Dflume.root.logger=INFO,Console 

监控/data/xinniu/flume/exec/exec.log

往exec.log 追加数据,追加5个之后,会生成新文件

cd /data/xinniu/flume/exec 
echo `date` >> exec.log

flume就把数据归集到hdfs上, hdfs每5个事件关闭文件,并产生新文件

file

webui查看:

file

弊端:

​ ExecSource tail -f 适合固定文件日志的读取,但是这会有一个弊端,就是当你的服务器宕机重启后,此时数据读取还是从头开始,既不支持文件断点续传的功能。

​ 有重复归集数据的风险

在重启后,不追加任何数据情况下,查看发现比之前多,证明重复归集了:

file

3.3 Taildir Source

Taildir Source:监听一个指定的目录下,指定正则格式的文件的内容,作为它的数据源,并支持断点续传功能 ;

如何支持断点续传的?

有个文件,存储断点续传的位置。

用于实时抽取指定目录下的多个文件。

创建配置文件

vim /data/xinniu/flume/conf/taildir-conf.properties

填写如下配置到文件中

配置方式:

Source:Taildir

Sink:hdfs

Channel:file

# Name the components on this agent
a2.sources = f1
a2.channels = c1
a2.sinks = t1
# sources类型
a2.sources.f1.type  =  taildir
#存储读取文件数据最后位置
a2.sources.f1.positionFile  =  /data/xinniu/flume/data
a2.sources.f1.filegroups  =  g1
a2.sources.f1.filegroups.g1  =  /data/xinniu/flume/taildir/.*.log

# Describe the sink
a2.sinks.t1.type = hdfs
a2.sinks.t1.hdfs.useLocalTimeStamp = true
a2.sinks.t1.hdfs.path = hdfs:///user/xinniu/flume-taildir/%Y-%m-%d
a2.sinks.t1.hdfs.filePrefix = %Y-%m-%d
a2.sinks.t1.hdfs.writeFormat = Text
a2.sinks.t1.hdfs.fileType = DataStream
a2.sinks.t1.hdfs.rollInterval = 0
a2.sinks.t1.hdfs.rollSize = 0
a2.sinks.t1.hdfs.rollCount = 5

# channals file
a2.channels.c1.type = file
a2.channels.c1.checkpointDir = /data/xinniu/flume/data/checkpoint1
a2.channels.c1.dataDirs = /data/xinniu/flume/data/data1
# Bind the source and sink to the channel
a2.sources.f1.channels = c1
a2.sinks.t1.channel = c1

创建监控目录

mkdir -p /data/xinniu/flume/taildir

启动flume agent a2 服务端

flume-ng  agent -n a2  -f /data/xinniu/flume/conf/taildir-conf.properties -Dflume.root.logger=DEBUG,Console

追加N条数据到 t1.log中

echo `date` >> /data/xinniu/flume/taildir/t1.log

webui:

file

查看/data/xinniu/flume/data/taildir_position.json 文件内容

内部记录了归集到哪个文件的哪个位置

file

重新启动后,先查看这个文件内容,再进行归集,不会重复归集

file

file

3.4 Spooling Directory Source

Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。

file

​ 此种方式不是实时抽取,是定时抽取。

flume官网中Spooling Directory Source描述:

Property Name      Default      Description
channels              –  
type                  –          The component type name, needs to be spooldir.
spoolDir              –          Spooling Directory Source监听的目录
fileSuffix        .COMPLETED    文件内容写入到channel之后,标记该文件
deletePolicy      never        文件内容写入到channel之后的删除策略: never or immediate
fileHeader        false        Whether to add a header storing the absolute path filename.
ignorePattern      ^$          Regular expression specifying which files to ignore (skip)
interceptors          –          指定传输中event的head(头信息),常用timestamp

a1.sources.r1.ignorePattern = ^(.)*\.tmp$ # 跳过.tmp结尾的文件

两个注意事项:

# 1) 拷贝到spool目录下的文件不可以再打开编辑

# 2) 不能将具有相同文件名字的文件拷贝到这个目录下

配置如下:

Source:Spooling Directory

Sink:hdfs

Channel:file

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 指定source源
a1.sources.r1.type = spooldir
# 指定监控的目录
a1.sources.r1.spoolDir = /data/xinniu/flume/spooling
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#如果想传完删掉,可以配置
#如果打开这个配置,cp mv 同样的文件到监控的目录下,也可以归集
#如果关闭这个配置,cp mv 同样的文件到监控的目录下,抛异常
#a1.sources.r1.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$    
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs:///user/xinniu/flume-spooling/%Y-%m
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 100
a1.sinks.k1.hdfs.callTimeout = 40000

# channals file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/xinniu/flume/data/checkpoint2
a1.channels.c1.dataDirs = /data/xinniu/flume/data/data2
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

其中:

​ Spooling Directory Source 监听/data/xinniu/flume/spooling 下的是否有新文件,如果有,则读到channel。输出到hdfs中,每100个event 生成一个hdfs文件,hdfs文件目录会根据hdfs.path 的配置自动创建。

创建监控目录

mkdir -p /data/xinniu/flume/spooling

启动flume agent a1 服务端

flume-ng  agent -n a1  -f /data/xinniu/flume/conf/spooling-conf.properties -Dflume.root.logger=DEBUG,Console

监控/data/xinniu/flume/spooling 目录,把文件cp到目录下,flume就开始归集,归集完,把文件重命名为xxx.COMPLETED

cp文件到目标目录(文件不重名)

file

已经被归集的文件,被重命名

file

归集后的文件

file

4 access日志归集与处理

4.1 flume归集access.log日志到HDFS

通过flume归集实现将日志写入hdfs用于离线处理

sources:taildir

channel:file

sink:hdfs

hdfs归集: 每5分钟生成一个文件,并且带有snappy压缩;

1)配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources类型
a1.sources.r1.type  =  TAILDIR
#存储读取文件数据最后位置
a1.sources.r1.positionFile  =  /data/xinniu/flume/data/taildir_position_access.json
a1.sources.r1.filegroups  =  f1
a1.sources.r1.filegroups.f1  = /data/xinniu/flume/access/access.log
# hdfs sink-k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs:///data/xinniu/access_log/%Y/%m%d
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = access_%Y%m%d
a1.sinks.k1.hdfs.fileSuffix = .log.snappy
# 设置输出压缩
a1.sinks.k1.hdfs.fileType = CompressedStream
# 设置snappy压缩
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0

# channals file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/xinniu/flume/data/checkpoint_access
a1.channels.c1.dataDirs = /data/xinniu/flume/data/data_access
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)配置文件位置

/data/xinniu/flume/conf/access-taildir-conf.properties

3)启动flume

启动命令:

flume-ng agent -n a1 -f /data/xinniu/flume/conf/access-taildir-conf.properties  -Dflume.root.logger=DEBUG,console

后台启动命令:

nohup flume-ng agent -n a1 -f /data/xinniu/flume/conf/access-taildir-conf.properties  -Dflume.root.logger=DEBUG,console >> /dev/null 2>&1 &

4)生成hdfs目录和文件

4.2 编写mapreduce对归集的access日志做数据清洗

业务字段说明:

#ip地址:
192.168.142.1
#请求用户,没用
-
# 请求时间: yyyyMMddHHmmss
14/Sep/2020:14:43:38 +0800
# 请求url:
GET /getBrandDetailByIdForPage.action?id=13 HTTP/1.1
# 状态码:
200
# 发送给客户端的字节数:
9111
# post请求boby数据:
-
# refer:
http://www.hainiushop.com/index.jsp
# useragent: 
Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108
# -
# 用户id
a6a597b4f3b24e16ba87cec919a4162c
# 登录用户名
pan123

编写mapreduce程序:

/**
 * Access2Text.java
 * com.hainiu.mr
 * Copyright (c) 2021, 海牛版权所有.
 * @author   潘牛                      
*/
package com.hainiu.mr;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 读取access日志数据转结构化数据
 * @author   潘牛                      
 * @Date     2021年8月10日      
 */
public class Access2Text extends Configured implements Tool{

    public static class Access2TextMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

        SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);

        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");

        Text keyOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] arr = line.split("\001");

            context.getCounter("hainiu", "total line nume").increment(1L);

            if(arr.length != 12){
                context.getCounter("hainiu", "bad line num").increment(1L);
                return;
            }

//          0   192.168.142.1
//          1   -
//          2   14/Sep/2020:14:43:38 +0800
//          3   GET /getBrandDetailByIdForPage.action?id=13 HTTP/1.1
//          4   200
//          5   9111
//          6   -
//          7   http://www.hainiushop.com/index.jsp
//          8   Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36
//          9   -
//          10  a6a597b4f3b24e16ba87cec919a4162c
//          11  -
            String ip = arr[0];
            // 14/Sep/2020:14:43:38 +0800
            String timestr = arr[2];
            String str = timestr.substring(0, timestr.indexOf(" "));
//          timestr.split(" ")[0]

            // yyyyMMddHHmmss格式的数据
            String format = "";
            try {
                Date date = sdf1.parse(str);
                format = sdf2.format(date);
            } catch (ParseException e) {
                e.printStackTrace();

            }

            // GET /getBrandDetailByIdForPage.action?id=13 HTTP/1.1
            String reqstr = arr[3];
            String[] arr2 = reqstr.split(" ");
            String reqType = arr2[0];
            String reqUrl = arr2[1];

            String status = arr[4];
            String body = arr[6];
            String refer = arr[7];
            String userAgent = arr[8];
            String uid = arr[10];
            String logonName = arr[11];

            StringBuilder sb = new StringBuilder();
            sb.append(ip).append("\t")
                .append(reqType).append("\t")
                .append(format).append("\t")
                .append(reqUrl).append("\t")
                .append(status).append("\t")
                .append(body).append("\t")
                .append(refer).append("\t")
                .append(userAgent).append("\t")
                .append(uid).append("\t")
                .append(logonName);
            keyOut.set(sb.toString());

            context.write(keyOut, NullWritable.get());

        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "access2text");

        // 设置运行的类(linux 运行用)
        job.setJarByClass(Access2Text.class);

        // 设置mapperclass
        job.setMapperClass(Access2TextMapper.class);

        job.setNumReduceTasks(0);

        // 设置mapper输出keyclass
        job.setMapOutputKeyClass(Text.class);
        // 设置mapper输出valueclass
        job.setMapOutputValueClass(NullWritable.class);
        // 设置reducer输出keyclass
        job.setOutputKeyClass(Text.class);
        // 设置reducer输出的valueclass
        job.setOutputValueClass(NullWritable.class);

        // 设置读取的输入文件的inputformatclass,默认是文本,可以不设置
        job.setInputFormatClass(TextInputFormat.class);
        // 设置写入文件的outputformatclass,默认是文本,可以不设置
        job.setOutputFormatClass(TextOutputFormat.class);

        // 设置输入目录
        FileInputFormat.addInputPath(job, new Path(args[0]));

        Path outputPath = new Path(args[1]);
        // 设置输出目录
        FileOutputFormat.setOutputPath(job, outputPath);

        // 自动删除输出目录
        FileSystem fs = FileSystem.get(conf);
        // 如果输出目录存在,就递归删除输出目录
        if(fs.exists(outputPath)){
            // 递归删除输出目录
            fs.delete(outputPath, true);
            System.out.println("delete outputPath==> 【" + outputPath.toString() + "】 success!");
        }

        // 提交job
        boolean status = job.waitForCompletion(false);
        return status ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {
        // /tmp/access/input_text /tmp/access/output
        System.exit(ToolRunner.run(new Access2Text(), args));
    }
}

运行参数: /tmp/access/input_text /tmp/access/output

运行结果:

file

处理好的一行的数据

file

后续可以基于给数据来建表,用hive统计分析查询

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-阿布都的都,http://hainiubl.com/topics/76083
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter