1 flume channl事务详解
Flume是一个日志文件收集工具,主要有三个阶段:
1.通过source 把数据从数据源收集过来。
2.通过source把数据传入到channel中
3.再把数据从channel传输到sink里面,sink把数据传给目的地(如:hdfs、hbase、hive、本地磁盘等).
flume在收集数据,进行数据转移的时候,有两个方面会设计到数据的流动。
第一个是Source收集到数据向Channel中放数据的时候,涉及到一个数据的发送,而source收到的数据发送到channel中时,并不是一条一条的放到channel中,而是一批一批将数据放到channel中。这里涉及到put的事务
第二个是Sink从Channel中拉取数据,这里是sink从channel中拉数据。也是一批一批拉取数据。
当涉及到数据的批量操作时,就会通过事务来保证数据的一致性和完整性。
2 自定义source
对于flume来说,官方提供的source类型有很多,以及可以满足我们很多应用场景,但是有时候官方自带的source不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义source来使用了。
官方也提供了自定义source的接口:
官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source
FLume Source 有PollableSource和EventDrivenSource
EventDrivenSource是事件驱动型source 比如:netcat和http
PollableSource是属于轮询拉取型source 比如:exec
需求:自定义一个source,实现从mysql读取数据,并将数据放到文件中
Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间
实现对应的方法:
-
configure(Context context)
初始化context
-
process(),该方法被一直调用
从mysql表中获取数据,然后把数据封装成event对象写入到channel
2.1 环境准备
安装mysql
下载地址:https://downloads.mysql.com/archives/community/
# 由于网络原因,本课程采用离线安装方式
# 解压安装包
mkdir -p /opt/tools/mysql
tar -xf mysql-5.7.22-1.el7.x86_64.rpm-bundle.tar -C /opt/tools/mysql
# 安装server时要依赖
yum install -y net-tools
# 离线安装
rpm -vih /opt/tools/mysql/mysql-community-common-5.7.22-1.el7.x86_64.rpm
rpm -vih /opt/tools/mysql/mysql-community-libs-5.7.22-1.el7.x86_64.rpm
rpm -vih /opt/tools/mysql/mysql-community-client-5.7.22-1.el7.x86_64.rpm
yum install -y /opt/tools/mysql/mysql-community-server-5.7.22-1.el7.x86_64.rpm
rpm -ivh /opt/tools/mysql/mysql-community-libs-compat-5.7.22-1.el7.x86_64.rpm
# 启动MySQL
systemctl start mysqld
systemctl status mysqld
# cat /var/log/mysqld.log | grep password 查看初始化密码
# 登录
mysql -uroot -p
# 输入初始化密码
# 设置校验密码的长度
set global validate_password_policy=LOW;
# 修改密码
set password=PASSWORD('12345678');
# 修改my.cnf,默认在/etc/my.cnf,执行:vim /etc/my.cnf,添加如下内容:
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character_set_server=utf8
# 重启生效
systemctl restart mysqld
# 对外开放权限
set global validate_password_policy=LOW;
grant all privileges on *.* to 'root'@'%' identified by '12345678';
flush privileges;
准备数据:
create database hainiu;
CREATE TABLE hainiu.student(
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
PRIMARY KEY (id)
);
insert into hainiu.student(id,name) values (1,'bajie'),(2,'tanbgseng'),(3,'shaseng'),(4,'wukong');
2.2 代码实现
创建maven项目
导入相关依赖包
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
编辑jdbc连接的配置文件jdbc.properties
Driver=com.mysql.jdbc.Driver
Url=jdbc:mysql://linux-16590:3306/hainiu?useUnicode=true&characterEncoding=utf-8
User=root
Password=12345678
代码实现:
// 创建mysql连接工具类
public class MysqlUtil {
//连接mysql的url、用户名、密码
private static String connectionURL, connectionUserName, connectionPassword;
//加载静态资源
static {
Properties p = new Properties();
try {
p.load(QueryMysql.class.getClassLoader().getResourceAsStream("jdbc.properties"));
Class.forName(p.getProperty("Driver"));
connectionURL = p.getProperty("Url");
connectionUserName = p.getProperty("User");
connectionPassword = p.getProperty("Password");
} catch (Exception e) {
e.printStackTrace();
}
}
//获取JDBC连接Connection
public static Connection getConnection() {
Connection conn = DriverManager.getConnection(connectionURL, connectionUserName, connectionPassword);
return conn;
}
public static void close(ResultSet rs, Statement stat, Connection conn) throws SQLException {
//--关闭连接
if(conn!=null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
conn=null;
}
}
if(rs!=null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
rs=null;
}
}
if(stat!=null) {
try {
stat.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
stat=null;
}
}
}
}
自定义source代码实现:
public class AutoSource extends AbstractSource implements Configurable, PollableSource {
private int idz;
private String name;
//--接收agent配置文件传入进来的参数
public void configure(Context context) {
idz = context.getInteger("id");
}
Connection conn=null;
PreparedStatement ps=null;
ResultSet rs=null;
public AutoSource() throws SQLException {
conn = MysqlUtil.getConnection();
}
//--核心方法该方法被一直调用
public Status process() throws EventDeliveryException {
try {
HashMap<String, String> hearderMap = new HashMap<String,String>();
SimpleEvent event = new SimpleEvent();
ps = conn.prepareStatement("select * from student where id >?");
ps.setInt(1,idz);
rs = ps.executeQuery();
while(rs.next()){
int id = rs.getInt("id");
String name = rs.getString("name");
String body=id+":"+name;
event.setHeaders(hearderMap);
event.setBody(body.getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(3000);
idz=id;
}
} catch (Exception e) {
e.printStackTrace();
try {
MysqlUtil.close(rs,ps,conn);
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
return Status.BACKOFF;
}
return Status.READY;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
}
2.3 定义agent
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#定义source
a1.sources.r1.type=com.hainiu.AutoSource
a1.sources.r1.id=2
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.catacity=10000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/root/mysqlfile
a1.sinks.k1.sink.rollInterval=60
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
结果:
3 自定义sink
需求:读取文件中的数据,并将数据写入到mysql的表中
3.1 数据准备
CREATE TABLE hainiu.teacher (
id INT(11) NOT NULL AUTO_INCREMENT,
name VARCHAR(64) NOT NULL,
age int NOT NULL,
PRIMARY KEY (id)
) ;
#/root/mysqldata/data.txt
1|zhanglaoshi|50
2|helaoshi|40
3|chenlaoshi|45
4|yanglaoshi|33
5|wanglaoshi
3.2 代码实现
package com.hainiu;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.io.UnsupportedEncodingException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class AutoSink extends AbstractSink implements Configurable {
Connection conn = null;
PreparedStatement ps = null;
String tablename;
public AutoSink() throws SQLException {
conn = MysqlUtil.getConnection();
}
public void configure(Context context) {
tablename = context.getString("tablename");
System.out.println(tablename);
}
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
System.out.println(channel);
Transaction tran = channel.getTransaction();
Event event;
tran.begin();
while (true) {
event = channel.take();
if (event != null) {
break;
}
}
try {
String body = new String(event.getBody(), "UTF-8");
String[] arrs = body.split("\\|");
System.out.println(arrs);
System.out.println(arrs.length);
if (arrs.length == 3) {
ps = conn.prepareStatement("insert into "+tablename+" values (?,?,?)");
ps.setInt(1, Integer.parseInt(arrs[0]));
ps.setString(2, arrs[1]);
ps.setInt(3, Integer.parseInt(arrs[2]));
ps.executeUpdate();
System.out.println("sucess insert");
status = Status.READY;
} else {
System.out.println("error insert");
status = Status.BACKOFF;
}
tran.commit();
} catch (Exception e) {
tran.rollback();
e.printStackTrace();
status = Status.BACKOFF;
}finally {
tran.close();
}
return status;
}
}
3.3 定义agent
#定义agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = exec
a1.sources.r1.command=tail -f /root/mysqldata/data.txt
#定义channel
a1.sinks.k1.type = com.hainiu.AutoSink
a1.sinks.k1.tablename=hainiu.teacher
# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1