5 flume 扩展

教程 薪牛 ⋅ 于 2023-02-04 17:55:06 ⋅ 1383 阅读

1 flume channl事务详解

Flume是一个日志文件收集工具,主要有三个阶段:

1.通过source 把数据从数据源收集过来。
2.通过source把数据传入到channel中
3.再把数据从channel传输到sink里面,sink把数据传给目的地(如:hdfs、hbase、hive、本地磁盘等).

flume在收集数据,进行数据转移的时候,有两个方面会设计到数据的流动。

第一个是Source收集到数据向Channel中放数据的时候,涉及到一个数据的发送,而source收到的数据发送到channel中时,并不是一条一条的放到channel中,而是一批一批将数据放到channel中。这里涉及到put的事务

第二个是Sink从Channel中拉取数据,这里是sink从channel中数据。也是一批一批拉取数据。

当涉及到数据的批量操作时,就会通过事务来保证数据的一致性和完整性。

file

2 自定义source

对于flume来说,官方提供的source类型有很多,以及可以满足我们很多应用场景,但是有时候官方自带的source不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义source来使用了。

官方也提供了自定义source的接口:

官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source

file

FLume Source 有PollableSource和EventDrivenSource

EventDrivenSource是事件驱动型source 比如:netcat和http

PollableSource是属于轮询拉取型source 比如:exec

需求:自定义一个source,实现从mysql读取数据,并将数据放到文件中

file

Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间

实现对应的方法:

  1. configure(Context context)

    初始化context

  2. process(),该方法被一直调用

    从mysql表中获取数据,然后把数据封装成event对象写入到channel

2.1 环境准备

安装mysql

下载地址:https://downloads.mysql.com/archives/community/

# 由于网络原因,本课程采用离线安装方式
# 解压安装包

mkdir -p /opt/tools/mysql
tar -xf mysql-5.7.22-1.el7.x86_64.rpm-bundle.tar -C /opt/tools/mysql

# 安装server时要依赖
yum install -y net-tools

# 离线安装
rpm -vih /opt/tools/mysql/mysql-community-common-5.7.22-1.el7.x86_64.rpm
rpm -vih /opt/tools/mysql/mysql-community-libs-5.7.22-1.el7.x86_64.rpm
rpm -vih /opt/tools/mysql/mysql-community-client-5.7.22-1.el7.x86_64.rpm
yum install -y /opt/tools/mysql/mysql-community-server-5.7.22-1.el7.x86_64.rpm
rpm -ivh /opt/tools/mysql/mysql-community-libs-compat-5.7.22-1.el7.x86_64.rpm

# 启动MySQL
systemctl start mysqld
systemctl status mysqld

# cat /var/log/mysqld.log | grep password 查看初始化密码

# 登录
mysql -uroot -p
# 输入初始化密码

# 设置校验密码的长度
set global validate_password_policy=LOW; 
# 修改密码
set password=PASSWORD('12345678');

# 修改my.cnf,默认在/etc/my.cnf,执行:vim /etc/my.cnf,添加如下内容:
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character_set_server=utf8

# 重启生效
systemctl restart mysqld
# 对外开放权限
set global validate_password_policy=LOW; 
grant all privileges on *.* to 'root'@'%' identified by '12345678';
flush privileges; 

准备数据:

create database hainiu;
CREATE TABLE hainiu.student(
  id int(11) NOT NULL AUTO_INCREMENT,
  name varchar(255) NOT NULL,
  PRIMARY KEY (id)
);

insert  into hainiu.student(id,name) values (1,'bajie'),(2,'tanbgseng'),(3,'shaseng'),(4,'wukong');

2.2 代码实现

创建maven项目

file

file

导入相关依赖包

<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.35</version>
    </dependency>
      <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.10.0</version>
</dependency>
</dependencies>

编辑jdbc连接的配置文件jdbc.properties

Driver=com.mysql.jdbc.Driver
Url=jdbc:mysql://linux-16590:3306/hainiu?useUnicode=true&characterEncoding=utf-8
User=root
Password=12345678

代码实现:

// 创建mysql连接工具类
public class MysqlUtil {

    //连接mysql的url、用户名、密码
    private static String connectionURL, connectionUserName, connectionPassword;

    //加载静态资源
    static {
        Properties p = new Properties();
        try {
            p.load(QueryMysql.class.getClassLoader().getResourceAsStream("jdbc.properties"));
            Class.forName(p.getProperty("Driver"));
            connectionURL = p.getProperty("Url");
            connectionUserName = p.getProperty("User");
            connectionPassword = p.getProperty("Password");

        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    //获取JDBC连接Connection
    public static Connection getConnection() {

            Connection conn = DriverManager.getConnection(connectionURL, connectionUserName, connectionPassword);
            return conn;
    }

    public static void close(ResultSet rs, Statement stat, Connection conn) throws SQLException {
       //--关闭连接            
            if(conn!=null) {                
                try {                    
                    conn.close();                
                } catch (SQLException e) {                                 
                        e.printStackTrace();                
                }finally {                    
                    conn=null;                
                }            
            }            
            if(rs!=null) {                
                try {                    
                    rs.close();             
                } catch (SQLException e) {                                       
                        e.printStackTrace();                
                }finally {                    
                    rs=null;                
                }            
            }            
            if(stat!=null) {                
                try {                    
                    stat.close();                
                } catch (SQLException e) {                                       
                        e.printStackTrace();                
                }finally {                    
                    stat=null;                
                }            
            }      
    }

}

自定义source代码实现:


 public class AutoSource extends AbstractSource implements Configurable, PollableSource {

    private int idz;
    private String name;
    //--接收agent配置文件传入进来的参数
    public void configure(Context context) {
        idz = context.getInteger("id");
    }
    Connection conn=null;
    PreparedStatement ps=null;
    ResultSet rs=null;
    public AutoSource() throws SQLException {

        conn = MysqlUtil.getConnection();
    }
     //--核心方法该方法被一直调用
    public Status process() throws EventDeliveryException {

        try {

            HashMap<String, String> hearderMap = new HashMap<String,String>();
            SimpleEvent event = new SimpleEvent();
              ps = conn.prepareStatement("select * from student where id >?");
             ps.setInt(1,idz);
              rs = ps.executeQuery();
             while(rs.next()){
                 int id = rs.getInt("id");
                 String name = rs.getString("name");
                 String body=id+":"+name;
                event.setHeaders(hearderMap);
                event.setBody(body.getBytes());
                getChannelProcessor().processEvent(event);
                Thread.sleep(3000);
                 idz=id;
             }

        } catch (Exception e) {
            e.printStackTrace();
            try {
                MysqlUtil.close(rs,ps,conn);
            } catch (SQLException ex) {
                throw new RuntimeException(ex);
            }
            return Status.BACKOFF;
        }
        return Status.READY;

    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

}

2.3 定义agent

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=com.hainiu.AutoSource
a1.sources.r1.id=2

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.catacity=10000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/root/mysqlfile
a1.sinks.k1.sink.rollInterval=60
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

结果:

file

3 自定义sink

需求:读取文件中的数据,并将数据写入到mysql的表中

file

3.1 数据准备

CREATE TABLE hainiu.teacher (
  id INT(11) NOT NULL AUTO_INCREMENT,
  name VARCHAR(64) NOT NULL,
  age int NOT NULL,
  PRIMARY KEY (id)
) ;
#/root/mysqldata/data.txt
1|zhanglaoshi|50
2|helaoshi|40
3|chenlaoshi|45
4|yanglaoshi|33
5|wanglaoshi

3.2 代码实现


package com.hainiu;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import java.io.UnsupportedEncodingException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class AutoSink extends AbstractSink implements Configurable {

    Connection conn = null;
    PreparedStatement ps = null;

    String tablename;

    public AutoSink() throws SQLException {
        conn = MysqlUtil.getConnection();
    }

    public void configure(Context context) {
        tablename = context.getString("tablename");
        System.out.println(tablename);
    }

    public Status process() throws EventDeliveryException {
        Status status = null;
        Channel channel = getChannel();
        System.out.println(channel);
        Transaction tran = channel.getTransaction();
        Event event;
        tran.begin();

        while (true) {
            event = channel.take();
            if (event != null) {
                break;
            }
        }
        try {

            String body = new String(event.getBody(), "UTF-8");
            String[] arrs = body.split("\\|");
            System.out.println(arrs);
            System.out.println(arrs.length);
            if (arrs.length == 3) {
                ps = conn.prepareStatement("insert into "+tablename+" values (?,?,?)");
                ps.setInt(1, Integer.parseInt(arrs[0]));
                ps.setString(2, arrs[1]);
                ps.setInt(3, Integer.parseInt(arrs[2]));
                ps.executeUpdate();
                System.out.println("sucess insert");
                status = Status.READY;
            } else {
                System.out.println("error insert");
                status = Status.BACKOFF;
            }
            tran.commit();
        } catch (Exception e) {
            tran.rollback();
            e.printStackTrace();
            status = Status.BACKOFF;
        }finally {
        tran.close();
        }
        return  status;
    }
}

3.3 定义agent

#定义agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = exec
a1.sources.r1.command=tail -f /root/mysqldata/data.txt
#定义channel
a1.sinks.k1.type = com.hainiu.AutoSink
a1.sinks.k1.tablename=hainiu.teacher
# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-薪牛,http://hainiubl.com/topics/76183
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter