Hudi
1.数据湖分类
1.Delta Lake
1.Delta Lake:DataBricks公司推出的一种数据湖方案,网址:https://delta.io/
1.流批一体的Data Lake存储层,支持 update/delete/merge
2.由于出自Databricks,Spark的所有数据写入方式,包括基于dataframe的批式、流式,以及SQL的Insert、 Insert Overwrite等都是支持的(开源的SQL写暂不支持,EMR做了支持)。
3.在数据写入方面,Delta 与 Spark 是强绑定的;在查询方面,开源 Delta 目前支持 Spark 与 Presto,但是, Spark 是不可或缺的,因为 delta log 的处理需要用到 Spark
2.Iceberg
2.Apache Iceberg:以类似于SQL的形式高性能的处理大型的开放式表,网址:https://iceberg.apache.org/
3.Hudi
3.Apache Hudi:管理大型分析数据集在HDFS上的存储,网址:https://hudi.apache.org/
主要支持Upserts、Deletes和Incrementa数据处理,支持三种数据写入方式:UPSERT,INSERT 和 BULK_INSERT。
2.Hudi 功能
1. Hudi是在大数据存储上的一个数据集,可以将Change Logs通过upsert的方式合并进Hudi;
2. Hudi 对上可以暴露成一个普通Hive或Spark表,通过API或命令行可以获取到增量修改的信息,继续供下游消费;
3. Hudi 保管修改历史,可以做时间旅行或回退;
4. Hudi 内部有主键到文件级的索引,默认是记录到文件的布隆过滤器;
3.Hudi 特性
1. Apache Hudi使得用户能在Hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,不仅可以批处理,还可 以在数据湖上进行流处理。
(1) Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询 会处理最后一个提交的快照,并基于此输出结果。
(2) 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录 的增量流,并解锁新的查询姿势(类别)。
4.Hudi 基础架构
1、通过DeltaStreammer、Flink、Spark等工具,将数据摄取到数据湖存储,可使用HDFS作为数据湖的数据存储;
2、基于HDFS可以构建Hudi的数据湖;
3、Hudi提供统一的访问Spark数据源和Flink数据源;
4、外部通过不同引擎,如:Spark、Flink、Presto、Hive、Impala、Aliyun DLA、AWS Redshit访问接口;
5.编译源码
5.1下载源码
https://archive.apache.org/dist/hudi/0.9.0/hudi-0.10.0.src.tgz
5.2安装Maven
[root@localhost conf]# vi /etc/profile
export MAVEN_HOME=/root/hudi/apache-maven-3.5.4
export PATH=$PATH:$MAVEN_HOME/bin
[root@localhost conf]# source /etc/profile
settings
<localRepository>/root/hudi/m2</localRepository>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云spring沛件仓库</name>
<url>https://maven.aliyun.com/repository/spring-plugin</url>
</mirror>
<mirror>
<id>repo2</id>
<name>Mirror from Maven Repo2</name>
<url>https://repo.spring.io/plugins-release/</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>UK</id>
<name>UK Central</name>
<url>http://uk.maven.org/maven2</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>jboss-public-repository-group</id>
<name>3Boss Public Repository Group</name>
<url>http://repository.jboss.org/nexus/content/groups/public</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>CN</id>
<name>OSChina Central</name>
<url>http://maven.oschina.net/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>google-maven-central</id>
<name>GCS Maven Central mirror Asia Pacific</name>
<url>https://maven-central-asia.storage-download.googleapis.com/maven2/</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>confluent</id>
<name>confluent maven</name>
<url>http://packages.confluent.io/maven/</url>
<mirrorOf>confluent</mirrorOf>
</mirror>
5.3安装jdk
#卸载openjdk
[root@localhost hudi]# rpm -qa | grep jdk
[root@localhost hudi]# [root@localhost hudi]# rpm -e --nodeps xxx
#安装jdk
export JAVA_HOME=/usr/java/jdk1.8.0_144
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
[root@localhost hudi]# source /etc/profile
5.4编译
[root@localhost hudi-0.9.0]# mvn clean install -DskipTests -DskipITs -Dscala-2.12 -Dspark3
6.Hudi CLI测试
[root@localhost hudi]# vim /etc/profile
export HUDI_HOME=/root/hudi/
export PATH=$PATH:$HUDI_HOME/bin
[root@localhost hudi]# source /etc/profile
[root@localhost hudi-cli]# sh ./hudi-cli.sh
7.大数据环境准备
7.1hdfs
tar -zxvf hadoop-2.7.3.tar.gz
#配置环境变量
export HADOOP_HOME=/root/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
#配置hadoop-env.sh
[root@localhost hadoop]# vim /root/hadoop/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_144
export HADOOP_HOME=/root/hadoop
#安装 HDFS
[root@localhost hadoop]# vim core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.92.161:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/export/server/hadoop/datas/tmp</value>
</property>
<property>
<name>hadoop.http.staticuser.user</name>
<value>root</value>
</property>
#创建临时目录
[root@localhost hadoop]# mkdir -p .export.server/hadoop/datas/tmp
#配置hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/export/server/hadoop/datas/dfs/nn</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/export/server/hadoop/datas/dfs/dn</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.data.dir.perm</name>
<value>750</value>
</property>
</configuration>
[root@localhost hadoop]# mkdir -p /export/server/hadoop/datas/dfs/nn
[root@localhost hadoop]# mkdir -p /export/server/hadoop/datas/dfs/dn
#配置slaves
[root@localhost hadoop]# vim slaves
#格式化HDFS
[root@localhost hadoop]# hdfs namenode -format
#启动HDFS集群
[root@localhost hadoop]# hadoop-daemon.sh start namenode
[root@localhost hadoop]# hadoop-daemon.sh start datanode
#关闭防火墙
[root@localhost hadoop]# systemctl stop firewalld
[root@localhost hadoop]# systemctl disable firewalld
#http://192.168.92.161:50070
7.2安装spark3
#1.安装Scala-2.12.10
[root@localhost ~]# tar -zxvf ./scala-2.12.10.tgz -C /export/server/
[root@localhost ~]# ln -s /export/server/scala-2.12.10/ /export/server/scala
[root@localhost ~]# vim /etc/profile
export SCALA_HOME=/export/server/scala
export PATH=$PATH:$SCALA_HOME/bin
[root@localhost ~]# source /etc/profile
[root@localhost ~]# scala -version
#2.安装 Spark 3.x
[root@localhost ~]# cd /export/server/spark/conf/
[root@localhost conf]# mv spark-env.sh.template spark-env.sh
JAVA_HOME=/usr/java/jdk1.8.0_144
SCALA_HOME=/export/server/scala
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
#3.本地模式启动spark-shell
[root@localhost spark]# bin/spark-shell --master local[2]
#4.测试
[root@localhost spark]# hdfs dfs -mkdir -p /datas/
[root@localhost spark]# hdfs dfs -put /export/server/spark/README.md /datas
8.spark-shell
#通过制定jar的方式启动
/export/server/spark/bin/spark-shell \
--master local[2] \
--jars /root/hudi-spark3-bundle_2.12-0.9.0.jar,\
/root/spark-avro_2.12-3.0.1.jar,/root/spark_unused-1.0.0.jar \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
测试
//1.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
//将数据保存到hudi表
val tableName = "hudi₋trips_cow"
val basePath = "hdfs://192.168.92.161:8020/datas/hudi-warehouse/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))
df.printSchema()
df.select("rider","begin_lat","begin_lon", "driver", "fare", "uuid", "ts").show(10,truncate=false)
//数据查询 将模拟产生Trip数据,保存到Hudi表中,由于Hudi诞生时基于Spark框架,所以SparkSQL支持Hudi数据源,直接通 过format指定数据源Source,设置相关属性保存数据即可 可以通过:paste 粘贴模式执行 然后通过ctrl + d 触发
df.write
.mode(Overwrite)
.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY,"partitionpath")
.option(TABLE_NAME, tableName)
.save(basePath)
//参数说明
//1.参数:getQuickstartWriteConfigs,设置写入/更新数据至Hudi时,Shuffle时分区数目
//2.参数:PRECOMBINE_FIELD_OPT_KEY,数据合并时,依据主键字段
//3.参数:RECORDKEY_FIELD_OPT_KEY,每条记录的唯一id,支持多个字段
//4.参数:PARTITIONPATH_FIELD_OPT_KEY,用于存放数据的分区字段
//Hudi表数据存储在HDFS上,以PARQUET列式方式存储的
//读取hudi数据 "/*/*/*/*" 从0.9.0版本之后可以不用写,代表分区
val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "/*/*/*/*")
tripsSnapshotDF.printSchema()
|-- _hoodie_commit_time: string (nullable = true) //数据提交时间
|-- _hoodie_commit_seqno: string (nullable = true) //数据提交序列号
|-- _hoodie_record_key: string (nullable = true) //数据的rowkey
|-- _hoodie_partition_path: string (nullable = true) //数据存储路径
|-- _hoodie_file_name: string (nullable = true) //数据的文件名称
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
//sql查询
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare from hudi_trips_snapshot").show()
9. Hudi 数据管理
9.1.hudi表结构
1..hoodie 文件:由于CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会严重影响HDFS的 性能,Hudi设计了一套文件合并机制。 .hoodie文件夹中存放了对应的`文件合并操作相关的日志文件`。
2.amricas和asia相关的路径是`实际的数据文件`,按分区存储,分区的路径key是可以指定的。
9.1.1.hoodie文件
Hudi把随着时间流逝,对表的一系列CRUD操作叫做Timeline,Timeline中某一次的操作,叫做Instant
Instant Action,记录本次操作是一次数据提交COMMITS,还是文件合并COMPACTION,或者是文件清理(CLEANS);
Instant Time,本次操作发生的时间;
State,操作的状态,发起(REQUESTED),进行中(INFLIGHT),还是已完成(COMPLETED);
9.1.2.数据文件
Hudi真实的数据文件使用Parquet文件格式存储
其中包含一个metadata元数据文件和数据文件parquet列式存储。
Hudi为了实现数据的CRUD,需要能够唯一标识一条记录,Hudi将把数据集中的`唯一字段(record key ) + 数据所在分区 (partitionPath)` 联合起来当做`数据的唯一键`。
1.在根目录下,每个分区都有唯一的分区路径,每个分区数据存储在多个文件中。
2.每个文件都有惟一的fileId和生成文件的commit所标识。如果发生更新操作时,多个文件共享相同的fileId,但会 有不同的commit
数据存储概述
每个文件都有惟一的fileId和生成文件的commit所标识。如果发生更新操作时,多个文件共享相同的fileId,但会 有不同的commit
Metadata 元数据
以时间轴(timeline)的形式将数据集上的各项操作元数据维护起来,以支持数据集的瞬态视图,这部分元数据存 储于根目录下的元数据目录。一共有三种类型的元数据:
1.Commits:一个单独的commit包含对数据集之上一批数据的一次原子写入操作的相关信息。我们用单调递增的时间戳来标识 commits,标定的是一次写入操作的开始。
2.Cleans:用于清除数据集中不再被查询所用到的旧版本文件的后台活动。
3.Compactions:用于协调Hudi内部的数据结构差异的后台活动。例如,将更新操作由基于行存的日志文件归集到列存数据上
Index 索引
Hudi维护着一个索引,以支持在记录key存在情况下,将新记录的key快速映射到对应的fileId。
Bloom filter:存储于数据文件页脚。默认选项,不依赖外部系统实现。数据和索引始终保持一致。
Apache HBase :可高效查找一小批key。在索引标记期间,此选项可能快几秒钟
Data 数据
Hudi以两种不同的存储格式存储所有摄取的数据,用户可选择满足下列条件的任意数据格式:
`读`优化的列存格式(ROFormat): 缺省值为Apache `Parquet`;
`写`优化的行存格式(WOFormat): 缺省值为Apache `Avro`;
10.IDEA 编程开发
10.1插入Hudi表,采用COW模式
main方法
def main(args: Array[String]): Unit = {
// 创建SparkSession实例对象,设置属性
val spark: SparkSession = {
SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// 设置序列化方式:Kryo
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
// 定义变量:表名称、保存路径
val tableName: String = "tbl_trips_cow"
val tablePath: String = "/hudi-warehouse/tbl_trips_cow"
// 构建数据生成器,模拟产生业务数据
import org.apache.hudi.QuickstartUtils._
// 模拟数据,插入Hudi表,采用COW模式
insertData(spark, tableName, tablePath)
// 应用结束,关闭资源
spark.stop()
}
保存数据
def insertData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// 第1步、模拟乘车数据
import org.apache.hudi.QuickstartUtils._
val dataGen: DataGenerator = new DataGenerator()
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
insertDF.printSchema()
insertDF.show(10, truncate = false)
//TODO 第2步、插入数据到Hudi表
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid") //主键
.option(PARTITIONPATH_FIELD.key(), "partitionpath") //分区
.option(TBL_NAME.key(), table) //表名称
.save(path)
}
10.2.1快照方式查询数据
//采用Snapshot Query快照方式查询表的数据
def queryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
tripsDF.printSchema()
tripsDF.show(10, truncate = false)
// 查询费用大于20,小于50的乘车数据
tripsDF
.filter($"fare" >= 20 && $"fare" <= 50)
.select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
.orderBy($"fare".desc, $"_hoodie_commit_time".desc)
.show(20, truncate = false)
}
10.2.2指定字符串,按照日期时间过滤获取数据
def queryDataByTime(spark: SparkSession, path: String): Unit = {
import org.apache.spark.sql.functions._
// 方式一:指定字符串,按照日期时间过滤获取数据
val df1 = spark.read
.format("hudi")
.option("as.of.instant", "20211225152016")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df1.printSchema()
df1.show(numRows = 5, truncate = false)
// 方式二:指定字符串,按照日期时间过滤获取数据
val df2 = spark.read
.format("hudi")
.option("as.of.instant", "2021-12-25 15:20:16")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df2.printSchema()
df2.show(numRows = 5, truncate = false)
}
10.3更新数据
//更新(Update)数据,第1步、模拟产生数据,第2步、模拟产生数据,针对第1步数据字段值更新,第3步、将数据更新到Hudi表中
def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
import spark.implicits._
// 第1步、模拟乘车数据
import org.apache.hudi.QuickstartUtils._
val updates = convertToStringList(dataGen.generateUpdates(100))
import scala.collection.JavaConverters._
val updateDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(updates.asScala, 2).toDS()
)
//TODO: 第2步、插入数据到Hudi表
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
updateDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
10.4增量查询
//增量查询Incremental query
//当Hudi中表的类型为:COW时,支持2种方式查询:Snapshot Queries、Incremental Queries;
//默认情况下查询属于:Snapshot Queries快照查询,通过参数:hoodie.datasource.query.type 可以进行设置。
// 1.设置查询数据模式为:incremental,增量读取
// 2.设置增量读取数据时开始时间
def incrementalQueryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
// 第1步、加载Hudi表数据,获取commit time时间,作为增量查询数据阈值
import org.apache.hudi.DataSourceReadOptions._
spark.read
.format("hudi")
.load(path)
.createOrReplaceTempView("view_temp_hudi_trips")
val commits: Array[String] = spark
.sql(
"""
|select
| distinct(_hoodie_commit_time) as commitTime
|from
| view_temp_hudi_trips
|order by
| commitTime DESC
|""".stripMargin
)
.map(row => row.getString(0))
.take(50)
val beginTime = commits(commits.length - 1) // commit time we are interested in
println(s"beginTime = ${beginTime}")
// 第2步、设置Hudi数据CommitTime时间阈值,进行增量数据查询
val tripsIncrementalDF = spark.read
.format("hudi")
// 设置查询数据模式为:incremental,增量读取
.option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
// 设置增量读取数据时开始时间
.option(BEGIN_INSTANTTIME.key(), beginTime)
.load(path)
// 第3步、将增量查询数据注册为临时视图,查询费用大于20数据
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark
.sql(
"""
|select
| `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
|from
| hudi_trips_incremental
|where
| fare > 20.0
|""".stripMargin
)
.show(10, truncate = false)
}
10.5删除数据
使用DataGenerator数据生成器,基于已有数据构建要删除的数据,最终保存到Hudi表中,需要设置属性参数: hoodie.datasource.write.operation 值为:delete。
/**
* 删除Hudi表数据,依据主键UUID进行删除,如果是分区表,指定分区路径
*/
def deleteData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// 第1步、加载Hudi表数据,获取条目数
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Raw Count = ${tripsDF.count()}")
// 第2步、模拟要删除的数据,从Hudi中加载数据,获取几条数据,转换为要删除数据集合
val dataframe = tripsDF.limit(2).select($"uuid", $"partitionpath")
import org.apache.hudi.QuickstartUtils._
val dataGenerator = new DataGenerator()
val deletes = dataGenerator.generateDeletes(dataframe.collectAsList())
import scala.collection.JavaConverters._
val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))
// 第3步、保存数据到Hudi表中,设置操作类型:DELETE
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
deleteDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// 设置数据操作类型为delete,默认值为upsert
.option(OPERATION.key(), "delete")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
// 第4步、再次加载Hudi表数据,统计条目数,查看是否减少2条数据
val hudiDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Delete After Count = ${hudiDF.count()}")
}
11.基本概念
11.1时间轴Timeline
Hudi 核心:在所有的表中维护了一个包含`在不同的即时(Instant)时间对数据集操作`(比如新增、修改或删除) 的时间轴(Timeline)。
在每一次对Hudi表的数据集操作时都会在`该表的Timeline上生成一个Instant`,从而可以实现在仅查询某个时间点 之后成功提交的数据,或是仅查询某个时间点之前的数据,有效避免了扫描更大时间范围的数据。
同时,可以高效地只查询更改前的文件(如在某个Instant提交了更改操作后,仅query某个时间点之前的数据,则 仍可以query修改前的数据)。
Timeline 是 Hudi 用来管理提交(commit)的抽象,每个 commit 都绑定一个固定时间戳,分散到时间线上。
在 Timeline 上,每个 commit 被抽象为一个 HoodieInstant,一个 instant 记录了一次提交 (commit) 的行为 、时间戳、和状态。
上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据, 该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。
时间轴(Timeline)的实现类(位于hudi-common-xx.jar中),时间轴相关的实现类位于org.apache.hudi.common.table.timeline包下
11.2文件管理
在每个分区内,文件被组织为文件组,由文件id充当唯一标识。每个文件组包含多个文件切片,其中每个切片包含 在某个即时时间的提交/压缩生成的基本列文件(.parquet)以及一组日志文件(.log),该文件包含自生成基本 文件以来对基本文件的插入/更新
Hudi 的 base file (parquet 文件) 在 footer 的 meta 去记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测。
Hudi 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包 含 magic number、size、content、footer 等信息,用于数据读、校验和过滤
11.3索引Index
Hudi通过索引机制提供高效的Upsert操作,该机制会将一个RecordKey+PartitionPath组合的方式作为唯一标识映 射到一个文件ID,而且这个唯一标识和文件组/文件ID之间的映射自记录被写入文件组开始就不会再改变。
全局索引:在全表的所有分区范围下强制要求键保持唯一,即确保对给定的键有且只有一个对应的记录。
非全局索引:仅在表的某一个分区内强制要求键保持唯一,它依靠写入器为同一个记录的更删提供一致的分区路径。
11.4hudi数据管理
12.表的存储类型
12.1数据计算模型
12.1.1批式模型( Batch)
批式模型就是使用 MapReduce、Hive、Spark 等典型的批计算引擎,以小时任务或者天任务的形式来做数据计算。
1、延迟:小时级延迟或者天级别延迟。这里的延迟不单单指的是定时任务的时间,在数据架构里,这里的延迟时间通常是定 时任务间隔时间 + 一系列依赖任务的计算时间 + 数据平台最终可以展示结果的时间。数据量大、逻辑复杂的情况下,小时任 务计算的数据通常真正延迟的时间是 2-3 小时。
2、数据完整度:数据较完整。以处理时间为例,小时级别的任务,通常计算的原始数据已经包含了小时内的所有数据,所以 得到的数据相对较完整。但如果业务需求是事件时间,这里涉及到终端的一些延迟上报机制,在这里,批式计算任务就很难派 上用场。
3、成本:成本很低。只有在做任务计算时,才会占用资源,如果不做任务计算,可以将这部分批式计算资源出让给在线业务 使用。从另一个角度来说成本是挺高的,如原始数据做了一些增删改查,数据晚到的情况,那么批式任务是要全量重新计算。
12.1.2流式模型(Stream)
流式模型,典型的就是使用 Flink 来进行实时的数据计算。
1、延迟:很短,甚至是实时。
2、数据完整度:较差。因为流式引擎不会等到所有数据到齐之后再开始计算,所以有一个 watermark 的概念,当数据的时间 小于 watermark 时,就会被丢弃,这样是无法对数据完整度有一个绝对的报障。在互联网场景中,流式模型主要用于活动时 的数据大盘展示,对数据的完整度要求并不算很高。在大部分场景中,用户需要开发两个程序,一是流式数据生产流式结果, 二是批式计算任务,用于次日修复实时结果。
3、成本:很高。因为流式任务是常驻的,并且对于多流 Join 的场景,通常要借助内存或者数据库来做 state 的存储,不管 是序列化开销,还是和外部组件交互产生的额外 IO,在大数据量下都是不容忽视的。
12.3增量模型(Incremental)
针对批式和流式的优缺点,Uber 提出了增量模型(Incremental Mode),相对批式来讲,更加实时;相对流式而 言,更加经济。
增量模型,简单来讲,是以 mini batch 的形式来跑准实时任务。Hudi 在增量模型中支持了两个最重要的特性:
1、Upsert:这个主要是解决批式模型中,数据不能插入、更新的问题,有了这个特性,可以往 Hive 中写入增量数据,而不 是每次进行完全的覆盖。(Hudi 自身维护了 key->file 的映射,所以当 upsert 时很容易找到 key 对应的文件)
2、Incremental Query:增量查询,减少计算的原始数据量。以 Uber 中司机和乘客的数据流 Join 为例,每次抓取两条数据 流中的增量数据进行批式的 Join 即可,相比流式数据而言,成本要降低几个数量级
12.2增量模型查询方式
12.2.1.Snapshot Queries(快照查询)
查询某个增量提交操作中数据集的最新快照,先进行动态合并最新的基本文件(Parquet)和增量文件(Avro)来提供近实时数据 集(通常会存在几分钟的延迟)。
读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件
12.2.2.Incremental Queries(增量查询)
仅查询新写入数据集的文件,需要`指定一个Commit/Compaction的即时时间`(位于Timeline上的某个Instant)作为条件,来`查询此条件之后的新数据`。
可查看自给定commit/delta commit即时操作以来新写入的数据,有效的提供变更流来启用增量数据管道。
12.2.3.Read Optimized Queries(读优化查询)
1.直接查询基本文件(数据集的最新快照),其实就是列式文件(Parquet)。并保证与非Hudi列式数据集相比,具有相同的列式查询性能。
2.可查看给定的commit/compact即时操作的表的最新快照。
3.读优化查询和快照查询相同仅访问基本文件,提供给定文件片自上次执行压缩操作以来的数据。通常查询数据的最新程度的保证取决于压缩策略
12.2.4.总结
表类型 | 支持的查询类型 |
---|---|
copy onwrite | 快照,增量查询 |
merge on read | 快照,增量查询,读优化查询 |
13.hudi表类型
13.1 cow(适合读)
`Copy On Write 表`
`在写入数据的时候,复制一份原来的拷贝,在其基础上添加新数据`
优点:读取时,只读取对应分区的一个数据文件即可,较为高效;
缺点:数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。
COW表主要使用列式文件格式(Parquet)存储数据,`在写入数据过程中,执行同步合并,更新数据版本并重写数据文件`,类似RDBMS中的B-Tree更新。
1)、更新update:在更新记录时,Hudi会先找到包含更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包含 其他记录的文件保持不变。当突然有大量写操作时会导致重写大量文件,从而导致极大的I/O开销。
2)、读取read:在读取数据时,通过读取最新的数据文件来获取最新的更新,此存储类型适用于少量写入和大量读取的场景
13.2MOR
`Merge On Read`
简称MOR,新插入的数据存储在delta log 中,定期再将delta log合并进行parquet数据文件。
读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。下图演示了MOR的两种数据读写方式。
13.3对比
对于写时复制(COW)和读时合并(MOR)writer来说,Hudi的WriteClient是相同的。
COW表,用户在 snapshot 读取的时候会扫描所有最新的 FileSlice 下的 base file。
MOR表,在 READ OPTIMIZED 模式下,只会读最近的经过 compaction 的 commit。
14.数据写操作流程
在Hudi数据湖框架中支持三种方式写入数据:`UPSERT(插入更新)`、`INSERT(插入)`和`BULK INSERT(写排序)`。
UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小
INSERT:跳过 index,写入效率更高
BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)
14.1UPSERT 写流程
`Copy On Write类型表,UPSERT 写入流程`
1.第一步、先对 records 按照 record key 去重;
2.第二步、首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是update,哪些 records 是 insert(key 第一次写入);
3.第三步、对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice);
4.第四步、对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSlice;
`Merge On Read类型表,UPSERT 写入流程`
1.第一步、先对 records 按照 record key 去重(可选)
2.第二步、首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
3.第三步、如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果 log file 可建索引,尝试 append 小的 log file,如果没有就新写一个 FileGroup + FileSlice + base file
4.第四步、如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小 的小文件,会 merge base file,生成新的 file slice)log file 大小达到阈值会 roll over 一个新的
14.2INSERT 写流程
`Copy On Write类型表,INSERT 写入流程`
1.第一步、先对 records 按照 record key 去重(可选);
2.第二步、不会创建 Index;
3.第三步、如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file;
`Merge On Read类型表,INSERT 写入流程`
1.第一步、先对 records 按照 record key 去重(可选);
2.第二步、不会创建 Index;
3.第三步、如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一 个新的 FileSlice + base file;