4 flume 案例实战

教程 薪牛 ⋅ 于 2023-02-02 12:55:14 ⋅ 1256 阅读

1 flume归集access.log日志到HDFS

file

1.1 安装nginx用于模拟生成点击日志

解压nginx并编译安装

# 解压
tar -xzf ./tengine-2.2.0.tar.gz -C /usr/local
# 检查系统编译环境并生成makefile文件
cd /usr/local/tengine-2.2.0
./configure 
#编译并安装
make && make install

安装完成会在/usr/local/目录下生成nginx目录

file

进入nginx安装目录下的conf目录编辑nginx.conf配置文件用于产生日志

file

指定产生的日志格式以及输出目录

file

2 启动nginx

/usr/local/nginx/sbin/nginx

file

远程桌面访问nginx默认端口80

file

观察日志是否产生

file

3 flume收集日志数据到hdfs

过flume归集实现将日志写入hdfs用于离线处理

sources:taildir

channel:file

sink:hdfs

hdfs归集: 每5分钟生成一个文件,并且带有snappy压缩;

1)配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources类型
a1.sources.r1.type  =  TAILDIR
#存储读取文件数据最后位置
a1.sources.r1.positionFile  =  /data/flume/taildir_position_access.json
a1.sources.r1.filegroups  =  f1
a1.sources.r1.filegroups.f1  = /data/access/access.log
# hdfs sink-k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs:///data/xinniu/access_log/%Y/%m%d
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = access_%Y%m%d
a1.sinks.k1.hdfs.fileSuffix = .log.snappy
# 设置输出压缩
a1.sinks.k1.hdfs.fileType = CompressedStream
# 设置snappy压缩
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0

# channals file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/flume/data/checkpoint_access
a1.channels.c1.dataDirs = /data/flume/data/data_access
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./access.agent -Dflume.root.logger=INFO,console

测试:

数据收集到hdfs指定的位置

file

2 flume归集access.log日志到kafka

file

过flume归集实现将日志写入kafka用于实时处理

sources:taildir

channel:file

sink:kafka

1)配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources类型
a1.sources.r1.type  =  TAILDIR
#存储读取文件数据最后位置
a1.sources.r1.positionFile  =  /data/flume/data/taildir_position_access.json
a1.sources.r1.filegroups  =  f1
a1.sources.r1.filegroups.f1  = /data/access/access.log
# hdfs sink-k1
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.topic = access

# channals file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/xinniu/flume/data/checkpoint_access
a1.channels.c1.dataDirs = /data/xinniu/flume/data/data_access
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

删除之前产生的日志,并重新加载nginx

rm -rf access.log
/usr/local/nginx/sbin/nginx -s reload

启动kafka消费者,消费access topic主题的数据

kafka-console-consumer.sh --bootstrap-server 11.99.173.7:9092 --topic access

启动agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./access.agent -Dflume.root.logger=INFO,console

测试:

kafka消费者消费到数据

file

3 flume归集access.log日志到hdfs和kafka

file

过flume归集实现将日志写入hdfs用于离线处理,同时归集到kafka用于实时处理

sources:taildir

channel:file

sink:kafka,hdfs

1)配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# sources类型
a1.sources.r1.type  =  TAILDIR
#存储读取文件数据最后位置
a1.sources.r1.positionFile  =  /data/flume/data/taildir_position_access.json
a1.sources.r1.filegroups  =  f1
a1.sources.r1.filegroups.f1  = /data/access/access.log
# hdfs sink-k1
a1.sinks.k2.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers = localhost:9092
a1.sinks.k2.kafka.topic = access
# kafka sink-k2
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs:///data/xinniu/access_log/%Y/%m%d
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = access_%Y%m%d
a1.sinks.k1.hdfs.fileSuffix = .log.snappy
# 设置输出压缩
a1.sinks.k1.hdfs.fileType = CompressedStream
# 设置snappy压缩
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0

# channals file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/xinniu/flume/data/checkpoint_access
a1.channels.c1.dataDirs = /data/xinniu/flume/data/data_access

a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /data/xinniu/flume/data/checkpoint_access1
a1.channels.c2.dataDirs = /data/xinniu/flume/data/data_access1
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

删除之前产生的日志,并重新加载nginx

rm -rf access.log
/usr/local/nginx/sbin/nginx -s reload

启动kafka消费者,消费access topic主题的数据

kafka-console-consumer.sh --bootstrap-server 11.99.173.7:9092 --topic access

启动agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./access_kafka_hdfs.agent -Dflume.root.logger=INFO,console

测试:

kafka消费者消费到数据

file

hdfs新导入的数据

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-薪牛,http://hainiubl.com/topics/76174
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter