1.1 什么是数据湖
https://cloud.fynote.com/share/d/f1jA9ipE
图片在链接里,不贴了
1.1.1 什么是数据湖
数据湖是一个集中式的存储库,允许你以任意规模存储多个来源、所有结构化和非结构化数据,可以按照原样存储数据,无需对数据进行结构化处理,并运行不同类型的分析,对数据进行加工,例如:大数据处理、实时分析、机器学习,以指导做出更好地决策。
1.1.2 大数据为什么需要数据湖
当前基于Hive的离线数据仓库已经非常成熟,在传统的离线数据仓库中对记录级别的数据进行更新是非常麻烦的,需要对待更新的数据所属的整个分区,甚至是整个表进行全面覆盖才行,由于离线数仓多级逐层加工的架构设计,数据更新时也需要从贴源层开始逐层反应到后续的派生表中去。
随着实时计算引擎的不断发展以及业务对于实时报表的产出需求不断膨胀,业界最近几年就一直聚焦并探索于实时数仓建设。根据数仓架构演变过程,在Lambda架构中含有离线处理与实时处理两条链路,其架构图如下:
正是由于两条链路处理数据导致数据不一致等一些列问题所以才有了Kappa架构,Kappa架构如下:
Kappa架构可以称为真正的实时数仓,目前在业界最常用实现就是Flink + Kafka,然而基于Kafka+Flink的实时数仓方案也有几个非常明显的缺陷,所以在目前很多企业中实时数仓构建中经常使用混合架构,没有实现所有业务都采用Kappa架构中实时处理实现。Kappa架构缺陷如下:
- Kafka无法支持海量数据存储。对于海量数据量的业务线来说,Kafka一般只能存储非常短时间的数据,比如最近一周,甚至最近一天。
- Kafka无法支持高效的OLAP查询,大多数业务都希望能在DWD\DWS层支持即席查询的,但是Kafka无法非常友好地支持这样的需求。
- 无法复用目前已经非常成熟的基于离线数仓的数据血缘、数据质量管理体系。需要重新实现一套数据血缘、数据质量管理体系。
- Kafka不支持update/upsert,目前Kafka仅支持append。
为了解决Kappa架构的痛点问题,业界最主流是采用“批流一体”方式,这里批流一体可以理解为批和流使用SQL同一处理,也可以理解为处理框架的统一,例如:Spark、Flink,但这里更重要指的是存储层上的统一,只要存储层面上做到“批流一体”就可以解决以上Kappa遇到的各种问题。数据湖技术可以很好的实现存储层面上的“批流一体”,这就是为什么大数据中需要数据湖的原因。
1.2 Iceberg概念及特点
Apache Iceberg是一种用于大型数据分析场景的开放表格式(Table Format)。Iceberg使用一种类似于SQL表的高性能表格式,Iceberg格式表单表可以存储数十PB数据,适配Spark、Trino、PrestoDB、Flink和Hive等计算引擎提供高性能的读写和元数据管理功能,Iceberg是一种数据湖解决方案。
注意:Trino就是原来的PrestoSQL ,2020年12月27日,PrestoSQL 项目更名为Trino,Presto分成两大分支:PrestoDB、PrestorSQL。
Iceberg非常轻量级,可以作为lib与Spark、Flink进行集成,Iceberg官网:https://iceberg.apache.org/,Iceberg具备以下特点:
- Iceberg支持实时/批量数据写入和读取,支持Spark/Flink计算引擎。
- Iceberg支持事务ACID,支持添加、删除、更新数据。
- 不绑定任何底层存储,支持Parquet、ORC、Avro格式兼容行存储和列存储。
- Iceberg支持隐藏分区和分区变更,方便业务进行数据分区策略。
- Iceberg支持快照数据重复查询,具备版本回滚功能。
- Iceberg扫描计划很快,读取表或者查询文件可以不需要分布式SQL引擎。
- Iceberg通过表元数据来对查询进行高效过滤。
- 基于乐观锁的并发支持,提供多线程并发写入能力并保证数据线性一致。
1.3 Iceberg数据存储格式
1.3.1 Iceberg术语
- data files(数据文件):
数据文件是Apache Iceberg表真实存储数据的文件,一般是在表的数据存储目录的data目录下,如果我们的文件格式选择的是parquet,那么文件是以“.parquet”结尾,例如:
00000-0-root_20211212192602_8036d31b-9598-4e30-8e67-ce6c39f034da-job_1639237002345_0025-00001.parquet 就是一个数据文件。
Iceberg每次更新会产生多个数据文件(data files)。
- Snapshot(表快照):
快照代表一张表在某个时刻的状态。每个快照里面会列出表在某个时刻的所有 data files 列表。data files是存储在不同的manifest files里面,manifest files是存储在一个Manifest list文件里面,而一个Manifest list文件代表一个快照。
- Manifest list(清单列表):
manifest list是一个元数据文件,它列出构建表快照(Snapshot)的清单(Manifest file)。这个元数据文件中存储的是Manifest file列表,每个Manifest file占据一行。每行中存储了Manifest file的路径、其存储的数据文件(data files)的分区范围,增加了几个数文件、删除了几个数据文件等信息,这些信息可以用来在查询时提供过滤,加快速度。
- Manifest file(清单文件):
Manifest file也是一个元数据文件,它列出组成快照(snapshot)的数据文件(data files)的列表信息。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据行数等信息。其中列级别的统计信息可以在扫描表数据时过滤掉不必要的文件。
Manifest file是以avro格式进行存储的,以“.avro”后缀结尾,例如:8138fce4-40f7-41d7-82a5-922274d2abba-m0.avro。
1.3.2 表格式Table Format
Apache Iceberg作为一款数据湖解决方案,是一种用于大型分析数据集的开放表格式(Table Format),表格式可以理解为元数据及数据文件的一种组织方式。Iceberg底层数据存储可以对接HDFS,S3文件系统,并支持多种文件格式,处于计算框架(Spark、Flink)之下,数据文件之上。
下面介绍下Iceberg底层文件组织方式,下图是Iceberg中表格式,s0、s1代表的是表Snapshot信息,每个表示当前操作的一个快照,每次commit都会生成一个快照Snapshot,每个Snapshot快照对应一个manifest list 元数据文件,每个manifest list 中包含多个Manifest元数据文件,manifest中记录了当前操作生成数据所对应的文件地址,也就是data file的地址。
基于snapshot的管理方式,Iceberg能够获取表历史版本数据、对表增量读取操作,data files存储支持不同的文件格式,目前支持parquet、ORC、Avro格式。
关于Iceberg表数据底层组织详细信息参照“Iceberg表数据组织与查询”小节。
1.4 Iceberg特点详述
1.4.1 Iceberg分区与隐藏分区(Hidden Partition)
Iceberg支持分区来加快数据查询。在Iceberg中设置分区后,可以在写入数据时将相似的行分组,在查询时加快查询速度。Iceberg中可以按照年、月、日和小时粒度划分时间戳组织分区。
在Hive中也支持分区,但是要想使分区能加快查询速度,需要在写SQL时指定对应的分区条件过滤数据,在Iceberg中写SQL查询时不需要再SQL中特别指定分区过滤条件,Iceberg会自动分区,过滤掉不需要的数据。
在Iceberg中分区信息可以被隐藏起来,Iceberg的分区字段可以通过一个字段计算出来,在建表或者修改分区策略之后,新的数据会自动计算所属于的分区,在查询的时候同样不用关心表的分区是什么字段,只需要关注业务逻辑,Iceberg会自动过滤不需要的分区数据。
正是由于Iceberg的分区信息和表数据存储目录是独立的,使得Iceberg的表分区可以被修改,而且不会涉及到数据迁移。
1.4.2 Iceberg表演化(Table Evolution)
在Hive分区表中,如果把一个按照天分区的表改成按小时分区,那么没有办法在原有表上进行修改,需要创建一个按照小时分区的表,然后把数据加载到此表中。
Iceberg支持就地表演化,可以通过SQL的方式进行表级别模式演进,例如:更改表分区布局。Iceberg进行以上操作时,代价极低,不存在读出数据重新写入或者迁移数据这种费时费力的操作。
1.4.3 模式演化(Schema Evolution)
Iceberg支持以下几种Schema的演化:
- ADD:向表或者嵌套结构增加新列。
- Drop:从表或嵌套结构中移除列。
- Rename:重命名表中或者嵌套结构中的列。
- Update:将复杂结构(Struct、Map<Key,Value>,list)中的基本类型扩展类型长度,比如:tinyint修改成int。
- Reorder:改变列的顺序,也可以改变嵌套结构中字段的排序顺序。
注意:
Iceberg Schema的改变只是元数据的操作改变,不会涉及到重写数据文件。Map结构类型不支持Add和Drop字段。
Iceberg保证Schema演化是没有副作用的独立操作,不会涉及到重写数据文件,具体如下:
- 增加列时不会从另一个列中读取已存在的数据
- 删除列或者嵌套结构中的字段时,不会改变任何其他列的值。
- 更新列或者嵌套结构中字段时,不会改变任何其他列的值。
- 改变列或者嵌套结构中字段顺序的时候,不会改变相关联的值。
Iceberg实现以上的原因使用唯一的id来追踪表中的每一列,当添加一个列时,会分配新的ID,因此列对应的数据不会被错误使用。
1.4.4 分区演化(partition Evolution)
Iceberg分区可以在现有表中更新,因为Iceberg查询流程并不和分区信息直接关联。
当我们改变一个表的分区策略时, 对应修改分区之前的数据不会改变, 依然会采用老的分区策略, 新的数据会采用新的分区策略, 也就是说同一个表会有两种分区策略, 旧数据采用旧分区策略, 新数据采用新新分区策略, 在元数据里两个分区策略相互独立,不重合.
因此,在我们写SQL进行数据查询时, 如果存在跨分区策略的情况, 则会解析成两个不同执行计划, 如Iceberg官网提供图所示:
图中booking_table表2008年按月分区, 进入2009年后改为按天分区, 这两中分区策略共存于该表中。得益于Iceberg的隐藏分区(Hidden Partition), 针对上图中的SQL查询, 不需要在SQL中特别指定分区过滤条件(是按照月还是按照天), Iceberg会自动分区, 过滤掉不需要的数据。
1.4.5 列顺序演化(Sort Order Evolution)
Iceberg可以在一个已经存在的表上修改排序策略。修改了排序策略之后, 旧数据依旧采用老排序策略不变。往Iceberg里写数据的计算引擎总是会选择最新的排序策略, 但是当排序的代价极其高昂的时候, 就不进行排序了。
1.5 Iceberg数据类型
Iceberg表支持以下数据类型:
类型 | 描述 | 注意点 |
---|---|---|
boolean | 布尔类型,true或者false | |
int | 32位有符号整形 | 可以转换成long类型 |
long | 64位有符号整形 | |
float | 单精度浮点型 | 可以转换成double类型 |
double | 双精度浮点型 | |
decimal(P,S) | decimal(P,S) | P代表精度,决定总位数,S代表规模,决定小数位数。P必须小于等于38。 |
date | 日期,不含时间和时区 | |
time | 时间,不含日期和时区 | 以微秒存储,1000微秒 = 1毫秒 |
timestamp | 不含时区的timestamp | 以微秒存储,1000微秒 = 1毫秒 |
timestamptz | 含时区的timestamp | 以微秒存储,1000微秒 = 1毫秒 |
string | 任意长度的字符串类型 | UTF-8编码 |
fixed(L) | 长度为L的固定长度字节数组 | |
binary | 任意长度的字节数组 | |
struct<...> | 任意数据类型组成的一个结构化字段 | |
list | 任意数据类型组成的List | |
map<K,V> | 任意类型组成的K,V的Map |
1.6 Hive与Iceberg整合
Iceberg就是一种表格式,支持使用Hive对Iceberg进行读写操作,但是对Hive的版本有要求,如下:
操作 | Hive 2.x | Hive 3.1.2 |
---|---|---|
CREATE EXTERNAL TABLE | √ | √ |
CREATE TABLE | √ | √ |
DROP TABLE | √ | √ |
SELECT | √ | √ |
INSERT INTO | √ | √ |
这里基于Hive3.1.2版本进行Hive操作Iceberg表讲解。
1.6.1 开启Hive支持Iceberg
- 下载iceberg-hive-runtime.jar
想要使用Hive支持查询Iceberg表,首先需要下载“iceberg-hive-runtime.jar”,Hive通过该Jar可以加载Hive或者更新Iceberg表元数据信息。下载地址:https://iceberg.apache.org/#releases/:
将以上jar包下载后,上传到Hive服务端和客户端对应的lib目录下。另外在向Hive中Iceberg格式表插入数据时需要到“libfb303-0.9.3.jar”包,将此包也上传到Hive服务端和客户端对应的lib目录下。
- 配置hive-site.xml
在Hive客户端$HIVE_HOME/conf/hive-site.xml中添加如下配置:
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
1.6.2 Hive中操作Iceberg格式表
从Hive引擎的角度来看,在运行环境中有Catalog概念(catalog主要描述了数据集的位置信息,就是元数据),Hive与Iceberg整合时,Iceberg支持多种不同的Catalog类型,例如:Hive、Hadoop、第三方厂商的AWS Glue和自定义Catalog。在实际应用场景中,Hive可能使用上述任意Catalog,甚至跨不同Catalog类型join数据,为此Hive提供了org.apache.iceberg.mr.hive.HiveIcebergStorageHandler(位于包iceberg-hive-runtime.jar)来支持读写Iceberg表,并通过在Hive中设置“iceberg.catalog..type”属性来决定加载Iceberg表的方式,该属性可以配置:hive、hadoop,其中“”是自己随便定义的名称,主要是在hive中创建Iceberg格式表时配置iceberg.catalog属性使用。
在Hive中创建Iceberg格式表时,根据创建Iceberg格式表时是否指定iceberg.catalog属性值,有以下三种方式决定Iceberg格式表如何加载(数据存储在什么位置)。
- 如果没有设置iceberg.catalog属性,默认使用HiveCatalog来加载
这种方式就是说如果在Hive中创建Iceberg格式表时,不指定iceberg.catalog属性,那么数据存储在对应的hive warehouse路径下。
在Hive客户端node3节点进入Hive,操作如下:
#在Hive中创建iceberg格式表
create table test_iceberg_tbl1(
id int ,
name string,
age int)
partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
#在Hive中加载如下两个包,在向Hive中插入数据时执行MR程序时需要使用到
hive> add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
hive> add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
#向表中插入数据
hive> insert into test_iceberg_tbl1 values (1,"zs",18,"20211212");
#查询表中的数据
hive> select * from test_iceberg_tbl1;
OK
1 zs 18 20211212
在Hive默认的warehouse目录下可以看到创建的表目录:
- 如果设置了iceberg.catalog对应的catalog名字,就用对应类型的catalog加载
这种情况就是说在Hive中创建Iceberg格式表时,如果指定了iceberg.catalog属性值,那么数据存储在指定的catalog名称对应配置的目录下。
在Hive客户端node3节点进入Hive,操作如下:
#注册一个HiveCatalog叫another_hive
hive> set iceberg.catalog.another_hive.type=hive;
#在Hive中创建iceberg格式表
create table test_iceberg_tbl2(
id int,
name string,
age int
)
partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
tblproperties ('iceberg.catalog'='another_hive');
#在Hive中加载如下两个包,在向Hive中插入数据时执行MR程序时需要使用到
hive> add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
hive> add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
#插入数据,并查询
hive> insert into test_iceberg_tbl2 values (2,"ls",20,"20211212");
hive> select * from test_iceberg_tbl2;
OK
2 ls 20 20211212
以上方式指定“iceberg.catalog.another_hive .type=hive”后,实际上就是使用的hive的catalog,这种方式与第一种方式不设置效果一样,创建后的表存储在hive默认的warehouse目录下。也可以在建表时指定location 写上路径,将数据存储在自定义对应路径上。
除了可以将catalog类型指定成hive之外,还可以指定成hadoop,在Hive中创建对应的iceberg格式表时需要指定location来指定iceberg数据存储的具体位置,这个位置是具有一定格式规范 的自定义路径。在Hive客户端node3节点进入Hive,操作如下:
#注册一个HadoopCatalog叫hadoop
hive> set iceberg.catalog.hadoop.type=hadoop;
#使用HadoopCatalog时,必须设置“iceberg.catalog.<catalog_name>.warehouse”指定warehouse路径
hive> set iceberg.catalog.hadoop.warehouse=hdfs://mycluster/iceberg_data;
#在Hive中创建iceberg格式表,这里创建成外表
create external table test_iceberg_tbl3(
id int,
name string,
age int
)
partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location 'hdfs://mycluster/iceberg_data/default/test_iceberg_tbl3'
tblproperties ('iceberg.catalog'='hadoop');
注意:以上location指定的路径必须是“iceberg.catalog.hadoop.warehouse”指定路径的子路径,格式必须是${iceberg.catalog.hadoop.warehouse}/${当前建表使用的hive库}/${创建的当前iceberg表名}
#在Hive中加载如下两个包,在向Hive中插入数据时执行MR程序时需要使用到
hive> add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
hive> add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
#插入数据,并查询
hive> insert into test_iceberg_tbl3 values (3,"ww",20,"20211213");
hive> select * from test_iceberg_tbl3;
OK
3 ww 20 20211213
在指定的“iceberg.catalog.hadoop .warehouse”路径下可以看到创建的表目录:
- 如果iceberg.catalog属性设置为“location_based_table”,可以从指定的根路径下加载Iceberg 表
这种情况就是说如果HDFS中已经存在iceberg格式表,我们可以通过在Hive中创建Icerberg格式表指定对应的location路径映射数据。,在Hive客户端中操作如下:
CREATE TABLE test_iceberg_tbl4 (
id int,
name string,
age int
)
PARTITIONED BY (
dt string
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/spark/person'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
注意:指定的location路径下必须是iceberg格式表数据,并且需要有元数据目录才可以。不能将其他数据映射到Hive iceberg格式表。
注意:由于Hive建表语句分区语法“Partitioned by”的限制,如果使用Hive创建Iceberg格式表,目前只能按照Hive语法来写,底层转换成Iceberg标识分区,这种情况下不能使用Iceberge的分区转换,例如:days(timestamp),如果想要使用Iceberg格式表的分区转换标识分区,需要使用Spark或者Flink引擎创建表。
1.7 Iceberg表数据组织与查询
1.7.1 下载avro-tools jar包
由于后期需要查看avro文件内容,我们可以通过avro-tool.jar来查看avro数据内容。可以在以下网站中下载avro-tools对应的jar包,下载之后上传到node5节点上:
“https://mvnrepository.com/artifact/org.apache.avro/avro-tools”。
查看avro文件信息可以直接执行如下命令,可以将avro中的数据转换成对应的json数据。
[root@node5 ~]# java -jar /software/avro-tools-1.8.1.jar tojson snap-*-wqer.avro
1.7.2 在Hive中创建Iceberg表并插入数据
在Hive中创建Iceberg格式表,并插入如下数据:
#在Hive中创建iceberg格式表
create table test_iceberg_tbl1(
id int ,
name string,
age int)
partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
#插入如下数据
insert into test_iceberg_tbl1 values (1,"zs",21,"20211212");
insert into test_iceberg_tbl1 values (2,"ls",22,"20211212");
insert into test_iceberg_tbl1 values (3,"ww",23,"20211213");
insert into test_iceberg_tbl1 values (4,"ml",24,"20211213");
insert into test_iceberg_tbl1 values (5,"tq",25,"20211213");
1.7.3 查看Iceberg底层数据存储
下图为Iceberg表“test_iceberg_tbl1”在HDFS中存储的数据组织图:
通过上图我们可以看到有5个Snapshot快照,以上5个Snapshot实际上就是对应了5个Manifest list清单列表。
- 查询最新快照数据
为了了解Iceberg如何查询最新数据,可以参照下面这张图来详细了解底层实现。
查询Iceberg表数据时,首先获取最新的metadata信息,这里先获取到“00000-ec504.metadata.json”元数据信息,解析当前元数据文件可以拿到当前表的快照id:“949358624197301886”以及这张表的所有快照信息,也就是json信息中snapshots数组对应的值。根据当前表的快照id值可以获取对应的snapshot对应的avro文件信息:“snap--32800.avro”,我们可以找到当前快照对应的路径,看到其包含的Manifest 清单文件有5个:"32800-m0.avro"、"2abba-m0.avro"、"d33de-m0.avro"、"748bf-m0.avro"、"*b946e-m0.avro",读取该Iceberg格式表最新数据就是读取这几个文件中描述对应的parquet数据文件即可。
我们可以看到“snap-*-32800.avro”快照文件中不仅有包含的manifest路径信息,还有“added_data_files_count”、“existing_data_files_count”、“deleted_data_files_count”三个属性,Iceberg 根据 deleted_data_files_count 大于 0 来判断对应的manifest清单文件里面是不是被删除的数据,如果一个manifest清单文件该值大于0代表数据删除,读数据时就无需读这个manifest清单文件对应的数据文件。
根据Manifest list找到了各个对应的manifest 清单文件,每个文件中描述了对应parquet文件存储的位置信息,可以看到在对应的avro文件中有“status”属性,该属性为1代表对应的parquet文件为新增文件,需要读取,为2代表parquet文件被删除。
- 查询某个快照的数据
Apache Iceberg支持查询历史上任何时刻的快照,在查询时需要指定snapshot-id属性即可,这个只能通过Spark/Flink来查询实现,例如在Spark中查询某个快照数据如下:
spark.read.option("snapshot-id",6155408340798912701L).format("iceberg").load("path")
查询某个快照数据的原理如下图所示(以查询快照id为“6155408340798912701”的数据为例):
通过上图可以看出,实际上读取历史快照数据和读取最新数据不同之处就是找到的snapshot-id不同而已,原理都是一样。
- 根据时间戳查看某个快照的数据
Apache iceberg还支持通过as-of-timestamp参数执行时间戳来读取某个快照的数据,同样也是通过Spark/Flink来读取,Spark读取代码如下:
spark.read.option("as-of-timestamp","时间戳").format("iceberg").load("path")
实际上通过时间戳找到对应数据文件的原理与通过snapshot-id找到数据文件原理一样,在*.metadata.json文件中,除了有“current-snapshot-id”、“snapshots”属性外还有“snapshot-log”属性,该属性对应的值如下:
我们可以看到其中有个 timestamp-ms 属性和 snapshot-id 属性,并且是按照 timestamp-ms 升序的。在 Iceberg 内部实现中,它会将 as-of-timestamp 指定的时间和 snapshot-log 数组里面每个元素的 timestamp-ms 进行比较,找出最后一个满足 timestamp-ms <= as-of-timestamp 对应的 snapshot-id,原理同上,通过snapshot-id再找到要读取的数据文件。
1.8 Spark3.1.2与Iceberg0.12.1整合
Spark可以操作Iceberg数据湖,这里使用的Iceberg的版本为0.12.1,此版本与Spark2.4版本之上兼容。由于在Spark2.4版本中在操作Iceberg时不支持DDL、增加分区及增加分区转换、Iceberg元数据查询、insert into/overwrite等操作,建议使用Spark3.x版本来整合Iceberg0.12.1版本,这里我们使用的Spark版本是3.1.2版本。
1.8.1 向pom文件导入依赖
在Idea中创建Maven项目,在pom文件中导入以下关键依赖:
<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Spark与Iceberg整合的依赖包-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.12.1</version>
</dependency>
<!-- avro格式 依赖包 -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<!-- parquet格式 依赖包 -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkSQL ON Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!–mysql依赖的jar包–>-->
<!--<dependency>-->
<!--<groupId>mysql</groupId>-->
<!--<artifactId>mysql-connector-java</artifactId>-->
<!--<version>5.1.47</version>-->
<!--</dependency>-->
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!– 向kafka 生产数据需要包 –>-->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka-clients</artifactId>-->
<!--<version>0.10.0.0</version>-->
<!--<!– 编译和测试使用jar包,没有传递性 –>-->
<!--<!–<scope>provided</scope>–>-->
<!--</dependency>-->
<!-- StructStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Scala 包-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
1.8.2 SparkSQL设置catalog配置
以下操作主要是SparkSQL操作Iceberg,同样Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。
在SparkSQL代码中通过以下方式来指定使用的Catalog:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hive catalog, catalog名称为hive_prod
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
.config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
1.8.3 使用Hive Catalog管理Iceberg表
使用Hive Catalog管理Iceberg表默认数据存储在Hive对应的Warehouse目录下,在Hive中会自动创建对应的Iceberg表,SparkSQL 相当于是Hive客户端,需要额外设置“iceberg.engine.hive.enabled”属性为true,否则在Hive对应的Iceberg格式表中查询不到数据。
- 创建表
//创建表 ,hive_pord:指定catalog名称。default:指定Hive中存在的库。test:创建的iceberg表名。
spark.sql(
"""
| create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
注意:
1)创建表时,表名称为:${catalog名称}.${Hive中库名}.${创建的Iceberg格式表名}
2)表创建之后,可以在Hive中查询到对应的test表,创建的是Hive外表,在对应的Hive warehouse 目录下可以看到对应的数据目录。
- 插入数据
//插入数据
spark.sql(
"""
|insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
- 查询数据
//查询数据
spark.sql(
"""
|select * from hive_prod.default.test
""".stripMargin).show()
结果如下:
在Hive对应的test表中也能查询到数据:
- 删除表
//删除表,删除表对应的数据不会被删除
spark.sql(
"""
|drop table hive_prod.default.test
""".stripMargin)
注意:删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。
1.8.4 使用Hadoop Catalog管理Iceberg表
使用Hadoop Catalog管理表,需要指定对应Iceberg存储数据的目录。
- 创建表
//创建表 ,hadoop_prod:指定Hadoop catalog名称。default:指定库名称。test:创建的iceberg表名。
spark.sql(
"""
| create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
注意:
1)创建表名称为:${Hadoop Catalog名称}.${随意定义的库名}.${Iceberg格式表名}
2)创建表后,会在hadoop_prod名称对应的目录下创建该表
- 插入数据
//插入数据
spark.sql(
"""
|insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
- 查询数据
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
- 创建对应的Hive表映射数据
在Hive表中执行如下建表语句:
CREATE TABLE hdfs_iceberg (
id int,
name string,
age int
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
在Hive中查询“hdfs_iceberg”表数据如下:
- 删除表
spark.sql(
"""
|drop table hadoop_prod.default.test
""".stripMargin)
注意:删除iceberg表后,数据被删除,对应的库目录存在。
1.8.5 Spark与Iceberg整合DDL操作
这里使用Hadoop Catalog 来演示Spark 与Iceberg的DDL操作。
1.8.5.1 CREATE TABLE 创建表
Create table 创建Iceberg表,创建表不仅可以创建普通表还可以创建分区表,再向分区表中插入一批数据时,必须对数据中分区列进行排序,否则会出现文件关闭错误,代码如下:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
//创建普通表
spark.sql(
"""
| create table if not exists hadoop_prod.default.normal_tbl(id int,name string,age int) using iceberg
""".stripMargin)
//创建分区表,以 loc 列为分区字段
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl(id int,name string,age int,loc string) using iceberg partitioned by (loc)
""".stripMargin)
//向分区表中插入数据时,必须对分区列排序,否则报错:java.lang.IllegalStateException: Already closed files for partition:xxx
spark.sql(
"""
|insert into table hadoop_prod.default.partition_tbl values (1,"zs",18,"beijing"),(3,"ww",20,"beijing"),(2,"ls",19,"shanghai"),(4,"ml",21,"shagnhai")
""".stripMargin)
spark.sql("select * from hadoop_prod.default.partition_tbl").show()
查询结果如下:
创建Iceberg分区时,还可以通过一些转换表达式对timestamp列来进行转换,创建隐藏分区 ,常用的转换表达式有如下几种:
- years(ts):按照年分区
//创建分区表 partition_tbl1 ,指定分区为year
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl1(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (years(regist_ts))
""".stripMargin)
//向表中插入数据,注意,插入的数据需要提前排序,必须排序,只要是相同日期数据写在一起就可以
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl1 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
//查询结果
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl1
""".stripMargin).show()
数据结果如下:
在HDFS中是按照年进行分区 :
- months(ts):按照“年-月”月级别分区
//创建分区表 partition_tbl2 ,指定分区为months,会按照“年-月”分区
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl2(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (months(regist_ts))
""".stripMargin)
//向表中插入数据,注意,插入的数据需要提前排序,必须排序,只要是相同日期数据写在一起就可以
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl2 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
//查询结果
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl2
""".stripMargin).show()
数据结果如下:
在HDFS中是按照 “年-月” 进行分区 :
- days(ts)或者date(ts):按照“年-月-日”天级别分区
//创建分区表 partition_tbl3 ,指定分区为 days,会按照“年-月-日”分区
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl3(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (days(regist_ts))
""".stripMargin)
//向表中插入数据,注意,插入的数据需要提前排序,必须排序,只要是相同日期数据写在一起就可以
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl3 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
//查询结果
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl3
""".stripMargin).show()
数据结果如下:
在HDFS中是按照 “年-月-日” 进行分区 :
- hours(ts)或者date_hour(ts):按照“年-月-日-时”小时级别分区
//创建分区表 partition_tbl4 ,指定分区为 hours,会按照“年-月-日-时”分区
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl4(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (hours(regist_ts))
""".stripMargin)
//向表中插入数据,注意,插入的数据需要提前排序,必须排序,只要是相同日期数据写在一起就可以
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl4 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
//查询结果
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl4
""".stripMargin).show()
数据结果如下:
在HDFS中是按照 “年-月-日-时” 进行分区 :
Iceberg支持的时间分区目前和将来只支持UTC,UTC是国际时,UTC+8就是国际时加八小时,是东八区时间,也就是北京时间,所以我们看到上面分区时间与数据时间不一致。
除了以上常用的时间隐藏分区外,Iceberg还支持bucket(N,col)分区,这种分区方式可以按照某列的hash值与N取余决定数据去往的分区。truncate(L,col),这种隐藏分区可以将字符串列截取L长度,相同的数据会被分到相同分区中。
1.8.5.2 CREATE TAEBL ... AS SELECT
Iceberg支持“create table .... as select ”语法,可以从查询语句中创建一张表,并插入对应的数据,操作如下:
- 创建表hadoop_prod.default.mytbl,并插入数据
avaval spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
//创建普通表
spark.sql(
"""
| create table hadoop_prod.default.mytbl(id int,name string,age int) using iceberg
""".stripMargin)
//向表中插入数据
spark.sql(
"""
|insert into table hadoop_prod.default.mytbl values (1,"zs",18),(3,"ww",20),(2,"ls",19),(4,"ml",21)
""".stripMargin)
//查询数据
spark.sql("select * from hadoop_prod.default.mytbl").show()
- 使用“create table ... as select”语法创建表mytal2并查询
spark.sql(
"""
|create table hadoop_prod.default.mytbl2 using iceberg as select id,name,age from hadoop_prod.default.mytbl
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.mytbl2
""".stripMargin).show()
结果如下:
1.8.5.3 REPLACE TABLE ... AS SELECT
Iceberg支持“replace table .... as select ”语法,可以从查询语句中重建一张表,并插入对应的数据,操作如下:
- 创建表“hadoop_prod.default.mytbl3”,并插入数据、展示
spark.sql(
"""
|create table hadoop_prod.default.mytbl3 (id int,name string,loc string,score int) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into table hadoop_prod.default.mytbl3 values (1,"zs","beijing",100),(2,"ls","shanghai",200)
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.mytbl3
""".stripMargin).show
- 重建表“hadoop_prod.default.mytbl3”,并插入对应数据
spark.sql(
"""
|replace table hadoop_prod.default.mytbl2 using iceberg as select * from hadoop_prod.default.mytbl3
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.mytbl2
""".stripMargin).show()
|
1.8.5.4 DROP TABLE
删除iceberg表时直接执行:“drop table xxx”语句即可,删除表时,表数据会被删除,但是库目录存在。
//删除表
spark.sql(
"""
|drop table hadoop_prod.default.mytbl
""".stripMargin)
1.8.5.5 ALTER TABLE
Iceberg的alter操作在Spark3.x版本中支持,alter一般包含以下操作:
- 添加、删除列
添加列操作:ALTER TABLE ... ADD COLUMN
删除列操作:ALTER TABLE ... DROP COLUMN
//1.创建表test,并插入数据、查询
spark.sql(
"""
|create table hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into table hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
spark.sql(
"""
| select * from hadoop_prod.default.test
""".stripMargin).show()
//2.添加字段,给 test表增加 gender 列、loc列
spark.sql(
"""
|alter table hadoop_prod.default.test add column gender string,loc string
""".stripMargin)
//3.删除字段,给test 表删除age 列
spark.sql(
"""
|alter table hadoop_prod.default.test drop column age
""".stripMargin)
//4.查看表test数据
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
最终表展示的列少了age列,多了gender、loc列:
- 重命名列
重命名列语法:ALTER TABLE ... RENAME COLUMN,操作如下:
//5.重命名列
spark.sql(
"""
|alter table hadoop_prod.default.test rename column gender to xxx
|
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
最终表展示的列 gender列变成了xxx列:
1.8.5.6 ALTER TABLE 分区操作
alter 分区操作包括增加分区和删除分区操作,这种分区操作在Spark3.x之后被支持,spark2.4版本不支持,并且使用时,必须在spark配置中加入spark.sql.extensions属性,其值为:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,在添加分区时还支持分区转换,语法如下:
- 添加分区语法:ALTER TABLE ... ADD PARTITION FIELD
- 删除分区语法:ALTER TABLE ... DROP PARTITION FIELD
具体操作如下:
- 创建表mytbl,并插入数据
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
//1.创建普通表
spark.sql(
"""
| create table hadoop_prod.default.mytbl(id int,name string,loc string,ts timestamp) using iceberg
""".stripMargin)
//2.向表中插入数据,并查询
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(1,'zs',"beijing",cast(1608469830 as timestamp)),
|(3,'ww',"shanghai",cast(1603096230 as timestamp))
""".stripMargin)
spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:
- 将表loc列添加为分区列,并插入数据,查询
//3.将 loc 列添加成分区,必须添加 config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 配置
spark.sql(
"""
|alter table hadoop_prod.default.mytbl add partition field loc
""".stripMargin)
//4.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
|(2,'ls',"shandong",cast(1634559630 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:
注意:添加分区字段是元数据操作,不会改变现有的表数据,新数据将使用新分区写入数据,现有数据将继续保留在原有的布局中。
- 将ts列进行转换作为分区列,插入数据并查询
//5.将 ts 列通过分区转换添加为分区列
spark.sql(
"""
|alter table hadoop_prod.default.mytbl add partition field years(ts)
""".stripMargin)
//6.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(4,'ml',"beijing",cast(1639920630 as timestamp)),
|(6,'gb',"tianjin",cast(1576843830 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:
- 删除分区loc
//7.删除表 mytbl 中的loc分区
spark.sql(
"""
|alter table hadoop_prod.default.mytbl drop partition field loc
""".stripMargin)
//8.继续向表 mytbl 中插入数据,并查询
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(4,'ml',"beijing",cast(1639920630 as timestamp)),
|(6,'gb',"tianjin",cast(1576843830 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:
注意:由于表中还有ts分区转换之后对应的分区,所以继续插入的数据loc分区为null
- 删除分区years(ts)
//9.删除表 mytbl 中的years(ts) 分区
spark.sql(
"""
|alter table hadoop_prod.default.mytbl drop partition field years(ts)
""".stripMargin)
//10.继续向表 mytbl 中插入数据,并查询
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
|(2,'ls',"shandong",cast(1634559630 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:
1.8.6 Spark与Iceberg整合查询操作
1.8.6.1 DataFrame API加载Iceberg中的数据
Spark操作Iceberg不仅可以使用SQL方式查询Iceberg中的数据,还可以使用DataFrame方式加载Iceberg表中的数据,可以通过spark.table(Iceberg表名)或者spark.read.format("iceberg").load("iceberg data path")来加载对应Iceberg表中的数据,操作如下:
val spark: SparkSession = SparkSession.builder().master("local").appName("test")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
//1.创建Iceberg表,并插入数据
spark.sql(
"""
|create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
//1.SQL 方式读取Iceberg中的数据
spark.sql("select * from hadoop_prod.mydb.mytest").show()
/**
* 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式
*/
//第一种方式使用DataFrame方式查询Iceberg表数据
val frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest")
frame1.show()
//第二种方式使用DataFrame加载 Iceberg表数据
val frame2: DataFrame = spark.read.format("iceberg").load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
frame2.show()
1.8.6.2 查询表快照
每次向Iceberg表中commit数据都会生成对应的一个快照,我们可以通过查询“catalog名称.{库名}.${Iceberg表}.snapshots ”来查询对应Iceberg表中拥有的所有快照,操作如下:
//向表 hadoop_prod.mydb.mytest 中再次插入以下数据
spark.sql(
"""
|insert into hadoop_prod.mydb.mytest values (4,"ml",18),(5,"tq",19),(6,"gb",20)
""".stripMargin)
//3.查看Iceberg表快照信息
spark.sql(
"""
|select * from hadoop_prod.mydb.mytest.snapshots
""".stripMargin).show(false)
结果如下:
1.8.6.3 查询表历史
对Iceberg表查询表历史就是查询Iceberg表快照信息内容,与查询表快照类似,通过“catalog名称.{库名}.${Iceberg表}.history ”命令进行查询,操作如下:
//4.查询表历史,实际上就是表快照的部分内容
spark.sql(
"""
|select * from hadoop_prod.mydb.mytest.history
""".stripMargin).show(false)
结果如下:
1.8.6.4 查询表data files
我们可以通过”catalog名称.{库名}.${Iceberg表}.files ”命令来查询Iceberg表对应的data files 信息,操作如下:
//5.查看表对应的data files
spark.sql(
"""
|select * from hadoop_prod.mydb.mytest.files
""".stripMargin).show(false)
结果如下:
1.8.6.5 查询Manifests
我们可以通过“catalog名称.{库名}.${Iceberg表}.manifests ”来查询表对应的manifests信息,具体操作如下:
//6.查看表对应的 Manifests
spark.sql(
"""
|select * from hadoop_prod.mydb.mytest.manifests
""".stripMargin).show(false)
结果如下:
1.8.6.6 查询指定快照数据
查询Iceberg表数据还可以指定snapshot-id 来查询指定快照的数据,这种方式可以使用DataFrame Api方式来查询,Spark3.x版本之后也可以通过SQL 方式来查询,操作如下:
//7.查询指定快照数据,快照ID可以通过读取json元数据文件获取
spark.read
.option("snapshot-id",3368002881426159310L)
.format("iceberg")
.load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
.show()
结果如下:
Spark3.x 版本之后,SQL指定快照语法为:
CALL ${Catalog 名称}.system.set_current_snapshot("${库名.表名}",快照ID)
操作如下:
//SQL 方式指定查询快照ID 数据
spark.sql(
"""
|call hadoop_prod.system.set_current_snapshot('mydb.mytest',3368002881426159310)
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.mydb.mytest
""".stripMargin).show()
结果如下:
1.8.6.7 根据时间戳查询数据
Spark读取Iceberg表可以指定“as-of-timestamp ”参数,通过指定一个毫秒时间参数查询Iceberg表中数据,iceberg会根据元数据找出timestamp-ms <= as-of-timestamp 对应的 snapshot-id ,也只能通过DataFrame Api把数据查询出来,Spark3.x版本之后支持SQL指定时间戳查询数据。具体操作如下:
//8.根据时间戳查询数据,时间戳指定成毫秒,iceberg会根据元数据找出timestamp-ms <= as-of-timestamp 对应的 snapshot-id ,把数据查询出来
spark.read.option("as-of-timestamp","1640066148000")
.format("iceberg")
.load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
.show()
结果如下:
Spark3.x 版本之后,SQL根据时间戳查询最近快照语法为:
CALL ${Catalog 名称}.system.rollback_to_timestamp("${库名.表名}",TIMESTAMP '日期数据')
操作如下:
//省略重新创建表mytest,两次插入数据
//SQL 方式查询指定 时间戳 快照数据
spark.sql(
"""
|CALL hadoop_prod.sy