数据湖 HUDI-FLINK 02

分享 123456789987654321 ⋅ 于 2022-07-13 17:10:44 ⋅ 1260 阅读

Flink操作Hudi

Hudi 集成 Flink

从Hudi 0.7.0版本支持Flink写入;
在Hudi 0.8.0版本中,重构API接口,又进一步完善了Flink和Hudi的集成;
在Hudi 0.9.0版本中,支持Flink CDC 数据写入,要求Flink版本为1.12+版本

1.批查询

1.创建hudi表,关联hudi表

CREATE TABLE t1(
  uuid VARCHAR(20), 
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://node1.itcast.cn:8020/hudi-warehouse/hudi-t1',
  'write.tasks' = '1',
  'compaction.tasks' = '1', 
  'table.type' = 'MERGE_ON_READ'
);

2.插入数据

INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');

INSERT INTO t1 VALUES
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

3.更新数据

--数据有则更新,没有插入
insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

2.流式查询

read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据; 
read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
table.type 设置表类型为 MERGE_ON_READ;

建表

CREATE TABLE t2(
      uuid VARCHAR(20), 
      name VARCHAR(10),
      age INT,
      ts TIMESTAMP(3),
      `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://node1.itcast.cn:8020/hudi-warehouse/hudi-t1',
  'table.type' = 'MERGE_ON_READ',
  'read.tasks' = '1', 
  'read.streaming.enabled' = 'true',
  'read.streaming.start-commit' = '20210316134557',
  'read.streaming.check-interval' = '4' 
);

3.Flink SQL Writer

3.1Flink SQL 集成 Kafka

Flink SQL Connector Kafka

SET execution.result-mode=table; 表格模式
SET execution.result-mode=tableau; tableau模式,更接近传统的数据库
CREATE TABLE tbl_kafka (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'flink-topic',
  'properties.bootstrap.servers' = 'node1.itcast.cn:9092',
  'properties.group.id' = 'test-group-10001',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
--向kafka发送数据
kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic flink-topic
/*
1001,90001,click
1001,90001,browser
1001,90001,click
1002,90002,click
1002,90003,click
1003,90001,order
1004,90001,order
*/

3.2Flink SQL 编程写入 Hudi

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>hudi-learning</artifactId>
        <groupId>cn.itcast.hudi</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hudi-flink</artifactId>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
        <repository>
            <id>central_maven</id>
            <name>central maven</name>
            <url>https://repo1.maven.org/maven2</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.12.2</flink.version>
        <hadoop.version>2.7.3</hadoop.version>
        <mysql.version>8.0.16</mysql.version>
    </properties>

    <dependencies>
        <!-- Flink Client -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Table API & SQL -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
            <version>0.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <!-- MySQL/FastJson/lombok -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <!-- slf4j及log4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- <mainClass>com.itcast.flink.batch.FlinkBatchWordCount</mainClass> -->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
3.2.1FlinkSQLKafka
package cn.test.hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.* ;

public class FlinkSQLKafkaDemo {

    public static void main(String[] args) {
        // 1-获取表执行环境
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode() // 设置流式模式
            .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 2-创建输入表,TODO:从Kafka消费数据
        tableEnv.executeSql(
            "CREATE TABLE order_kafka_source (\n" +
                "  orderId STRING,\n" +
                "  userId STRING,\n" +
                "  orderTime STRING,\n" +
                "  ip STRING,\n" +
                "  orderMoney DOUBLE,\n" +
                "  orderStatus INT\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'order-topic',\n" +
                "  'properties.bootstrap.servers' = 'node1.itcast.cn:9092',\n" +
                "  'properties.group.id' = 'gid-1001',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json',\n" +
                "  'json.fail-on-missing-field' = 'false',\n" +
                "  'json.ignore-parse-errors' = 'true'\n" +
                ")"
        );

        // 3-转换数据:可以使用SQL,也可以时Table API
        Table etlTable = tableEnv
            .from("order_kafka_source")
            // 添加字段:Hudi表分区字段, "orderTime": "2021-11-22 10:34:34.136" -> 021-11-22
            .addColumns(
                $("orderTime").substring(0, 10).as("partition_day")
            )
            // 添加字段:Hudi表数据合并字段,时间戳, "orderId": "20211122103434136000001" ->  20211122103434136
            .addColumns(
                $("orderId").substring(0, 17).as("ts")
            );
        tableEnv.createTemporaryView("view_order", etlTable);

        // 4-创建输入表,TODO: 将结果数据进行输出
        tableEnv.executeSql("SELECT * FROM view_order").print();
    }
}
3.2.2FlinkSQLHudi

由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点

package cn.test.hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 基于Flink SQL Connector实现:实时消费Topic中数据,转换处理后,实时存储到Hudi表中
 */
public class FlinkSQLHudiDemo {

    public static void main(String[] args) {
        // 1-获取表执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点
        env.setParallelism(1);
        env.enableCheckpointing(5000) ;
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode() // 设置流式模式
            .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 2-创建输入表,TODO:从Kafka消费数据
        tableEnv.executeSql(
            "CREATE TABLE order_kafka_source (\n" +
                "  orderId STRING,\n" +
                "  userId STRING,\n" +
                "  orderTime STRING,\n" +
                "  ip STRING,\n" +
                "  orderMoney DOUBLE,\n" +
                "  orderStatus INT\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'order-topic',\n" +
                "  'properties.bootstrap.servers' = 'node1.itcast.cn:9092',\n" +
                "  'properties.group.id' = 'gid-1002',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json',\n" +
                "  'json.fail-on-missing-field' = 'false',\n" +
                "  'json.ignore-parse-errors' = 'true'\n" +
                ")"
        );

        // 3-转换数据:可以使用SQL,也可以时Table API
        Table etlTable = tableEnv
            .from("order_kafka_source")
            // 添加字段:Hudi表数据合并字段,时间戳, "orderId": "20211122103434136000001" ->  20211122103434136
            .addColumns(
                $("orderId").substring(0, 17).as("ts")
            )
            // 添加字段:Hudi表分区字段, "orderTime": "2021-11-22 10:34:34.136" -> 021-11-22
            .addColumns(
                $("orderTime").substring(0, 10).as("partition_day")
            );
        tableEnv.createTemporaryView("view_order", etlTable);

        // 4-创建输出表,TODO: 关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
        tableEnv.executeSql(
            "CREATE TABLE order_hudi_sink (\n" +
                "  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
                "  userId STRING,\n" +
                "  orderTime STRING,\n" +
                "  ip STRING,\n" +
                "  orderMoney DOUBLE,\n" +
                "  orderStatus INT,\n" +
                "  ts STRING,\n" +
                "  partition_day STRING\n" +
                ")\n" +
                "PARTITIONED BY (partition_day)\n" +
                "WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'path' = 'file:///D:/flink_hudi_order',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'write.operation' = 'upsert',\n" +
                "    'hoodie.datasource.write.recordkey.field'= 'orderId',\n" +
                "    'write.precombine.field' = 'ts',\n" +
                "    'write.tasks'= '1'\n" +
                ")"
        );

        // 5-通过子查询方式,将数据写入输出表
        tableEnv.executeSql(
            "INSERT INTO order_hudi_sink " +
                "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM view_order"
        );

    }

}
3.2.3从Hudi表中加载数据
package cn.test.hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * 基于Flink SQL Connector实现:从Hudi表中加载数据,编写SQL查询
 */
public class FlinkSQLReadDemo {

    public static void main(String[] args) {
        // 1-获取表执行环境
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings) ;

        // 2-创建输入表,TODO:加载Hudi表数据
        tableEnv.executeSql(
            "CREATE TABLE order_hudi(\n" +
                "  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
                "  userId STRING,\n" +
                "  orderTime STRING,\n" +
                "  ip STRING,\n" +
                "  orderMoney DOUBLE,\n" +
                "  orderStatus INT,\n" +
                "  ts STRING,\n" +
                "  partition_day STRING\n" +
                ")\n" +
                "PARTITIONED BY (partition_day)\n" +
                "WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'path' = 'file:///D:/flink_hudi_order',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'read.streaming.enabled' = 'true',\n" +
                "    'read.streaming.check-interval' = '4'\n" +
                ")"
        );

        // 3-执行查询语句,读取流式读取Hudi表数据
        tableEnv.executeSql(
            "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi"
        ).print() ;
    }
}

3.3Flink SQL Client 写入Hudi

set execution.result-mode=tableau; 
set execution.checkpointing.interval=3sec;
--1.创建kafka表
--2.创建hudi表
--3.select hudi表插入kafka表

3.4Flink CDC Hudi

环境准备

修改Hudi集成flink和Hive编译依赖版本配置 
    原因:现在版本Hudi,在编译的时候本身默认已经集成的flink-SQL-connector-hive的包,会和Flink lib包下的flink-SQL- connector-hive冲突。所以,编译的过程中只修改hive编译版本

hudi-0.9.0/packaging/hudi-flink-bundle/pom.xml  
 #编译Hudi源码
 mvn clean install -DskipTests -Drat.skip=true -Dscala-2.12 -Dspark3 -Pflink-bundle-shade-hive2
1. 创建MySQL数据库表
create database test ;
create table test.tbl_users(
   id bigint auto_increment primary key,
   name varchar(20) null,
   birthday timestamp default CURRENT_TIMESTAMP not null,
   ts timestamp default CURRENT_TIMESTAMP not null
);

INSERT INTO test.tbl_users(name) VALUES("itheima") ;

插入数据
insert into test.tbl_users (name) values ('zhangsan')
insert into test.tbl_users (name) values ('lisi');
insert into test.tbl_users (name) values ('wangwu');
insert into test.tbl_users (name) values ('laoda');
insert into test.tbl_users (name) values ('laoer');

2. 创建CDC 表
CREATE TABLE users_source_mysql (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'node1.itcast.cn',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'tbl_users'
);

3. 创建视图
    从输入表获取数据,字段与输出表字段相同
create view view_users_cdc 
AS 
SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM users_source_mysql;

4. 创建输出表,关联Hudi表,并且自动同步到Hive表
CREATE TABLE users_sink_hudi_hive(
id bigint ,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
part VARCHAR(20),
primary key(id) not enforced
)
PARTITIONED BY (part)
with(
'connector'='hudi',
'path'= 'hdfs://node1.itcast.cn:8020/users_sink_hudi_hive', 
'table.type'= 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field'= 'id', 
'write.precombine.field'= 'ts',
'write.tasks'= '1',
'write.rate.limit'= '2000', 
'compaction.tasks'= '1', 
'compaction.async.enabled'= 'true',
'compaction.trigger.strategy'= 'num_commits',
'compaction.delta_commits'= '1',
'changelog.enabled'= 'true',
'read.streaming.enabled'= 'true',
'read.streaming.check-interval'= '3',
'hive_sync.enable'= 'true',
'hive_sync.mode'= 'hms',
'hive_sync.metastore.uris'= 'thrift://node1.itcast.cn:9083',
'hive_sync.jdbc_url'= 'jdbc:hive2://node1.itcast.cn:10000',
'hive_sync.table'= 'users_sink_hudi_hive',
'hive_sync.db'= 'default',
'hive_sync.username'= 'root',
'hive_sync.password'= '123456',
'hive_sync.support_timestamp'= 'true'
);

5. 查询视图数据,插入到输出表
INSERT INTO users_sink_hudi_hive SELECT id, name, birthday, ts, part FROM view_users_cdc ;

4.Hive 表查询

需要引入hudi-hadoop-mr-bundle-0.9.0.jar包,放到$HIVE_HOME/lib下
#启动客户端
beeline -u jdbc:hive2://node1.itcast.cn:10000 -n root -p 123456
已自动生产hudi MOR模式的2张表: 
    1.`users_sink_hudi_hive_ro`,ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。 其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可;        2.`users_sink_hudi_hive_rt`,rt表示增量视图,主要针对增量查询的rt表;
`ro表只能查parquet文件数据,rt表 parquet文件 数据和log文件数据都可查`;

查询Hive 分区表数据

set hive.exec.mode.local.auto=true;
set hive.input.format = org.apache.hudi.hadiio.hive.HoodieCombineHiveInputFormat;
set hive.mapred.mode=nonstrict;

select xx from table where xxx;

5.管理Hudi表

hudi-0.9.0/hudi-cli/hudi-cli.sh
#连接Hudi表,查看表信息 
connect --path hdfs://node1.itcast.cn:8020/users_sink_hudi_hive

#查看Hudi commit信息 
commits show --sortBy "CommitTime"

#查看Hudi compactions 计划 
compactions show all
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75867
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter