Spark_Hudi
Hudi集成hive数据查询重复问题
设置set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;这个参数会影响到普通的hive表查询,
因此在查询完成后,应该设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
或者改为默认值 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
1.统计滴滴出行指标
2.spark工具类
创建sparksql对象
package cn.itcast.hudi.didi
import org.apache.spark.sql.SparkSession
/**
* SparkSQL 操作数据(加载读取和保存写入)时工具类,比如获取SparkSession实例对象等等
*/
object SparkUtils {
/**
* 构建SparkSession实例对象时,默认情况下本地模式运行
*/
def createSpakSession(clazz: Class[_], master: String = "local[8]", partitions: Int = 4): SparkSession = {
SparkSession.builder()
.appName(clazz.getSimpleName.stripSuffix("$"))
.master(master)
//设置序列化方式
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//产生shuffler时候的分区数
.config("spark.sql.shuffle.partitions", partitions)
.getOrCreate()
}
def main(args: Array[String]): Unit = {
val spark = createSpakSession(this.getClass)
println(spark)
Thread.sleep(10000000)
spark.stop()
}
}
3.将日期转换星期工具类
package cn.itcast.hudi.test
import java.util.{Calendar, Date}
import org.apache.commons.lang3.time.FastDateFormat
/**
* 将日期转换星期,例如输入:2021-10-10 -> 星期日
*/
object DayWeekTest {
def main(args: Array[String]): Unit = {
val dateStr: String = "2021-09-10"
val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd")
val calendar: Calendar = Calendar.getInstance();
val date: Date = format.parse(dateStr)
calendar.setTime(date)
val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match {
case 1 => "星期日"
case 2 => "星期一"
case 3 => "星期二"
case 4 => "星期三"
case 5 => "星期四"
case 6 => "星期五"
case 7 => "星期六"
}
println(dayWeek)
}
}
4.数据ETL保存代码
4.1 构建SparkSession实例对象(集成Hudi和HDFS)
val spark: SparkSession = SparkUtils.createSpakSession(this.getClass)
4.2 加载本地CSV文件格式滴滴出行数据
val didiDF = readCsvFile(spark, datasPath)
didiDF.printSchema()
didiDF.show(10, truncate = false)
/**
* 读取CSV格式文本文件数据,封装到DataFrame中
*/
def readCsvFile(spark: SparkSession, path: String): DataFrame = {
spark.read
// 设置分隔符为制表符
.option("sep", "\\t")
// 文件首行为列名称
.option("header", "true")
// 依据数值自动推断数据类型
.option("inferSchema", "true")
// 指定文件路径
.csv(path)
}
4.3滴滴出行数据ETL处理
dataframe.withColumn 添加列
val etlDF: DataFrame = process(didiDF)
etlDF.printSchema()
etlDF.show(10, truncate = false)
/**
* 对滴滴出行海口数据进行ETL转换操作:指定ts和partitionpath列
*/
def process(dataframe: DataFrame): DataFrame = {
dataframe
// 添加字段,就是Hudi表分区字段,三级分区 -> yyyy-MM-dd
.withColumn(
"partitionpath",
concat_ws("-", col("year"), col("month"), col("day"))
)
// 删除列
.drop("year", "month", "day")
// 添加timestamp列,作为Hudi表记录数据合并时字段,使用发车时间
.withColumn(
"ts",
unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss")
)
}
4.4保存转换后数据至Hudi表
saveToHudi(etlDF, hudiTableName, hudiTablePath)
def saveToHudi(dataframe: DataFrame, table: String, path: String): Unit = {
// 导入包
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
// 保存数据
dataframe.write
.mode(SaveMode.Overwrite)
.format("hudi")
//分区并行度
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(RECORDKEY_FIELD.key(), "order_id")
.option(PRECOMBINE_FIELD.key(), "ts")
//分区字段
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
5.指标查询分析代码
//缓存
hudiDF.persist(StorageLevel.MEMORY_AND_DISK)
// 当数据不在使用时,释放资源
hudiDF.unpersist()
主类代码
def main(args: Array[String]): Unit = {
// step1、构建SparkSession实例对象(集成Hudi和HDFS)
val spark: SparkSession = SparkUtils.createSpakSession(this.getClass, partitions = 8)
// step2、加载Hudi表的数据,指定字段
val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath)
// hudiDF.printSchema()
// hudiDF.show(10, truncate = false)
// 由于数据被使用多次,所以建议缓存
hudiDF.persist(StorageLevel.MEMORY_AND_DISK)
// step3、按照业务指标进行统计分析
// 指标1:订单类型统计
reportProduct(hudiDF)
// 指标2:订单时效统计
reportType(hudiDF)
// 指标3:交通类型统计
reportTraffic(hudiDF)
// 指标4:订单价格统计
reportPrice(hudiDF)
// 指标5:订单距离统计
reportDistance(hudiDF)
// 指标6:日期类型 -> 星期,进行统计
reportWeek(hudiDF)
// 当数据不在使用时,释放资源
hudiDF.unpersist()
// step4、应用结束,关闭资源
spark.stop()
}
5.1订单类型统计
/**
* 订单类型统计,字段:product_id
*/
def reportProduct(dataframe: DataFrame): Unit = {
// a. 按照产品线ID分组统计即可
val reportDF: DataFrame = dataframe.groupBy("product_id").count()
// b. 自定义UDF函数,转换名称
val to_name = udf(
(productId: Int) => {
productId match {
case 1 => "滴滴专车"
case 2 => "滴滴企业专车"
case 3 => "滴滴快车"
case 4 => "滴滴企业快车"
}
}
)
// c. 转换名称
val resultDF: DataFrame = reportDF.select(
to_name(col("product_id")).as("order_type"),
col("count").as("total")
)
resultDF.printSchema()
resultDF.show(10, truncate = false)
}
5.2订单时效统计
/**
* 订单时效性统计,字段:type
*/
def reportType(dataframe: DataFrame): Unit = {
// a. 按照时效性id分组统计即可
val reportDF: DataFrame = dataframe.groupBy("type").count()
// b. 自定义UDF函数,转换名称
val to_name = udf(
(realtimeType: Int) => {
realtimeType match {
case 0 => "实时"
case 1 => "预约"
}
}
)
// c. 转换名称
val resultDF: DataFrame = reportDF.select(
to_name(col("type")).as("order_realtime"),
col("count").as("total")
)
resultDF.printSchema()
resultDF.show(10, truncate = false)
}
5.3交通类型统计
/**
* 交通类型统计,字段:traffic_type
*/
def reportTraffic(dataframe: DataFrame): Unit = {
// a. 按照交通类型id分组统计即可
val reportDF: DataFrame = dataframe.groupBy("traffic_type").count()
// b. 自定义UDF函数,转换名称
val to_name = udf(
(trafficType: Int) => {
trafficType match {
case 0 => "普通散客"
case 1 => "企业时租"
case 2 => "企业接机套餐"
case 3 => "企业送机套餐"
case 4 => "拼车"
case 5 => "接机"
case 6 => "送机"
case 302 => "跨城拼车"
case _ => "未知"
}
}
)
// c. 转换名称
val resultDF: DataFrame = reportDF.select(
to_name(col("traffic_type")).as("traffic_type"),
col("count").as("total")
)
resultDF.printSchema()
resultDF.show(10, truncate = false)
}
5.4订单价格统计
/**
* 订单价格统计,先将订单价格划分阶段,再统计各个阶段数目,使用字段:pre_total_fee
*/
def reportPrice(dataframe: DataFrame): Unit = {
val resultDF: DataFrame = dataframe
//先聚合,然后通过sum ,when (统计的字段).between(最小值,最大值),满足条件则计数1,否则计数0
.agg(
// 价格 0 ~ 15
sum(
when(col("pre_total_fee").between(0, 15), 1).otherwise(0)
).as("0~15"),
// 价格 16 ~ 30
sum(
when(col("pre_total_fee").between(16, 30), 1).otherwise(0)
).as("16~30"),
// 价格 31 ~ 50
sum(
when(col("pre_total_fee").between(31, 50), 1).otherwise(0)
).as("31~50"),
// 价格 51 ~ 100
sum(
when(col("pre_total_fee").between(51, 100), 1).otherwise(0)
).as("51~100"),
// 价格 100+
sum(
when(col("pre_total_fee").gt(100), 1).otherwise(0)
).as("100+")
)
resultDF.printSchema()
resultDF.show(10, truncate = false)
}
5.5订单距离统计
/**
* 订单距离统计,先将订单距离划分为不同区间,再统计各个区间数目,使用字段:start_dest_distance
*/
def reportDistance(dataframe: DataFrame): Unit = {
val resultDF: DataFrame = dataframe
.agg(
// 距离: 0 ~ 10km
sum(
when(col("start_dest_distance").between(0, 10000), 1).otherwise(0)
).as("0~10km"),
// 距离: 10 ~ 20km
sum(
when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0)
).as("10~20km"),
// 距离: 20 ~ 20km
sum(
when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0)
).as("20~30"),
// 距离: 30 ~ 50km
sum(
when(col("start_dest_distance").between(30001, 50000), 1).otherwise(0)
).as("30~50km"),
// 距离: 50km+
sum(
when(col("start_dest_distance").gt(50001), 1).otherwise(0)
).as("50+km")
)
resultDF.printSchema()
resultDF.show(10, truncate = false)
}
5.6日期类型 -> 星期,进行统计
/**
* 订单星期分组统计,先将日期转换为星期,再对星期分组统计,使用字段:departure_time
*/
def reportWeek(dataframe: DataFrame): Unit = {
// a. 自定义UDF函数,转换日期为星期
val to_week: UserDefinedFunction = udf(
(dateStr: String) => {
val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val calendar: Calendar = Calendar.getInstance();
val date: Date = format.parse(dateStr)
calendar.setTime(date)
val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match {
case 1 => "星期日"
case 2 => "星期一"
case 3 => "星期二"
case 4 => "星期三"
case 5 => "星期四"
case 6 => "星期五"
case 7 => "星期六"
}
// 返回星期即可
dayWeek
}
)
// b. 对数据处理,使用udf函数
val reportDF: DataFrame = dataframe
.select(
to_week(col("departure_time")).as("week")
)
.groupBy("week").count()
.select(
col("week"), col("count").as("total")
)
reportDF.printSchema()
reportDF.show(10, truncate = false)
}
6.hudi集成hive
环境准备
在Hive中创建表关联至Hudi表,将集成JAR包:hudi-hadoop-mr-bundle-0.9.0.jar,放入至$HIVE_HOME/lib目录下
重启hiveserver2
6.1hive外表映射hudi
--需要创建hive外部分区表
-- 1. 创建数据库database
CREATE DATABASE IF NOT EXISTS db_hudi ;
-- 2. 使用数据库
USE db_hudi ;
-- 3. 创建表
CREATE EXTERNAL TABLE IF NOT EXISTS tbl_hudi_didi(
order_id bigint ,
product_id int ,
city_id int ,
district int ,
county int ,
type int ,
combo_type int ,
traffic_type int ,
passenger_count int ,
driver_product_id int ,
start_dest_distance int ,
arrive_time string ,
departure_time string ,
pre_total_fee double ,
normal_time string ,
bubble_trace_id string ,
product_1level int ,
dest_lng double ,
dest_lat double ,
starting_lng double ,
starting_lat double ,
partitionpath string ,
ts bigint
)
PARTITIONED BY (date_str string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/tbl_didi_haikou' ;
-- 查看分区表分区
SHOW PARTITIONS db_hudi.tbl_hudi_didi ;
-- 5. 手动添加分区信息
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-22') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-22' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-23') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-23' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-24') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-24' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-25') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-25' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-26') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-26' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-27') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-27' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-28') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-28' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-29') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-29' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-30') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-30' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-31') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-5-31' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-1') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-1' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-2') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-2' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-3') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-3' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-4') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-4' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-5') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-5' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-6') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-6' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-7') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-7' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-8') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-8' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-9') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-9' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-10') LOCATION '/hudi-warehouse/tbl_didi_haikou/2017-6-10' ;
-- 测试,查询数据
SET hive.mapred.mode = nonstrict ;
SELECT order_id, product_id, type, pre_total_fee, traffic_type, start_dest_distance FROM db_hudi.tbl_hudi_didi LIMIT 20;
6.2查看hive数据
-- 开发测试,设置运行模式为本地模式
set hive.exec.mode.local.auto=true;
set hive.exec.mode.local.auto.tasks.max=10;
set hive.exec.mode.local.auto.inputbytes.max=50000000;
-- 指标一:订单类型统计
WITH tmp AS (
SELECT product_id, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY product_id
)
SELECT
CASE product_id
WHEN 1 THEN "滴滴专车"
WHEN 2 THEN "滴滴企业专车"
WHEN 3 THEN "滴滴快车"
WHEN 4 THEN "滴滴企业快车"
END AS order_type,
total
FROM tmp ;
-- 指标二:订单时效性统计
WITH tmp AS (
SELECT type, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY type
)
SELECT
CASE type
WHEN 0 THEN "实时"
WHEN 1 THEN "预约"
END AS order_type,
total
FROM tmp ;
-- 指标三:订单交通类型统计
SELECT traffic_type, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY traffic_type ;
-- 指标五:订单价格统计,先将价格划分区间,再统计,此处使用 WHEN函数和SUM函数
SELECT
SUM(
CASE WHEN pre_total_fee BETWEEN 0 AND 15 THEN 1 ELSE 0 END
) AS 0_15,
SUM(
CASE WHEN pre_total_fee BETWEEN 16 AND 30 THEN 1 ELSE 0 END
) AS 16_30,
SUM(
CASE WHEN pre_total_fee BETWEEN 31 AND 50 THEN 1 ELSE 0 END
) AS 31_50,
SUM(
CASE WHEN pre_total_fee BETWEEN 50 AND 100 THEN 1 ELSE 0 END
) AS 51_100,
SUM(
CASE WHEN pre_total_fee > 100 THEN 1 ELSE 0 END
) AS 100_
FROM
db_hudi.tbl_hudi_didi ;
7.structedstreaming写入Hudi()
参数官网说明 https://hudi.apache.org/docs/writing_data#datasource-writer
package cn.itcast.hudi.stream
import cn.itcast.hudi.didi.SparkUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
/**
* 基于StructuredStreaming结构化流实时从Kafka消费数据,经过ETL转换后,存储至Hudi表
*/
object HudiStructuredDemo {
/**
* 指定Kafka Topic名称,实时消费数据
*/
def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1.itcast.cn:9092")
.option("subscribe", topicName)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000) //每次最多处理数据数量
.option("failOnDataLoss", "false")//处理失败是否停止
.load()
}
/**
* 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存Hudi表中
*/
def process(streamDF: DataFrame): DataFrame = {
streamDF
// 选择字段
.selectExpr(
"CAST(key AS STRING) order_id",
"CAST(value AS STRING) AS message",
"topic", "partition", "offset", "timestamp"
)
// 解析Message数据,提取字段值
.withColumn("user_id", get_json_object(col("message"), "$.userId"))
.withColumn("order_time", get_json_object(col("message"), "$.orderTime"))
.withColumn("ip", get_json_object(col("message"), "$.ip"))
.withColumn("order_money", get_json_object(col("message"), "$.orderMoney"))
.withColumn("order_status", get_json_object(col("message"), "$.orderStatus"))
// 删除message字段
.drop(col("message"))
// 转换订单日期时间格式为Long类型,作为hudi表中合并数据字段
.withColumn("ts", to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss.SSS"))
// 订单日期时间提取分区日志:yyyyMMdd
.withColumn("day", substring(col("order_time"), 0, 10))
}
/**
* 将流式数据DataFrame保存到Hudi表中
*/
def saveToHudi(streamDF: DataFrame): Unit = {
streamDF.writeStream
.outputMode(OutputMode.Append())
.queryName("query-hudi-streaming")
.foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
println(s"============== BatchId: ${batchId} start ==============")
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
batchDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(RECORDKEY_FIELD.key(), "order_id") //主键
.option(PRECOMBINE_FIELD.key(), "ts") //合并数据字段
.option(PARTITIONPATH_FIELD.key(), "day") //分区字段
.option(TBL_NAME.key(), "tbl_hudi_order") //表名
.option(TABLE_TYPE.key(), "MERGE_ON_READ") // 设置表类型
// 分区值对应目录格式,与Hive分区策略一致 需要导包
//设置hudi分区与hive分区一致
.option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
.save("/hudi-warehouse/tbl_hudi_order")
})
.option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001")
.start()
}
def main(args: Array[String]): Unit = {
// step1、构建SparkSession实例对象
val spark: SparkSession = SparkUtils.createSpakSession(this.getClass)
//step2、从Kafka实时消费数据
val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic")
// step3、提取数据,转换数据类型
val streamDF: DataFrame = process(kafkaStreamDF)
// step4、保存数据至Hudi表中:MOR类型表,读取表数据合并文件
saveToHudi(streamDF)
// step5、流式应用启动以后,等待终止
spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running .........."))
spark.streams.awaitAnyTermination()
}
}
8.SparkShell 查询分析
9.DeltaStreamer 工具类
从Kafka单次摄取新事件
支持json、avro或自定义记录类型的传入数据
管理检查点,回滚和恢复
利用DFS或Confluent schema注册表的Avro模式
支持自定义转换操作
//官方地址 https://hudi.apache.org/docs/hoodie_deltastreamer#deltastreamer
//官方提供案例:实时消费Kafka中数据,数据格式为Avro,将其存储到Hudi表。
10.sparksql-hudi
10.1启动
#hudi0.9支持
# https://hudi.apache.org/docs/quick-start-guide
--启动spark-sql交互式命令行,设置依赖jar包和相关属性参数。
--conf 'spark.sql.extensions=org.apache.sql.hudi.HoodieSparkSessionExtension'
--Hudi默认upsert/insert/delete的并发度是1500,对于演示小规模数据集设置更小的并发度。
set hoodie.upsert.shuffle.parallelism = 1;
set hoodie.insert.shuffle.parallelism = 1;
set hoodie.delete.shuffle.parallelism = 1;
--设置不同步Hudi表元数据:
set hddoie.datasource/meta.sync.enable=false;
10.2使用
编写DDL语句,创建Hudi表,表的类型:MOR和分区表,主键为id,分区字段为dt,合并字段默认为ts。
10.3sparksql创建DDL 创建表
10.3.1创建COW类型Hudi表
-- create a managed cow table
create table if not exists hudi_table0 (
id int,
name string,
price double
) using hudi
options (
type = 'cow',
primaryKey = 'id'
);
10.3.2创建表时,指定location存储路径,表就是外部表
-- create an external mor table
create table if not exists hudi_table1 (
id int,
name string,
price double,
ts bigint
) using hudi
options (
type = 'mor',
primaryKey = 'id,name',
preCombineField = 'ts'
);
10.3.3创建表时设置为分区表:partitioned table
create table if not exists hudi_table_p0 (
id bigint,
name string,
dt string,
hh string
) using hudi
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh);
10.3.4支持使用CTAS:Create table as select方式创建表
create table h3 using hudi
as
select 1 as id, 'a1' as name, 10 as price;
11.MergeInto 语句
合并语法
--依据判断条件,决定对数据操作时,属于插入insert、更新update,还是删除delete