数据湖 HUDI-Spark 03

分享 123456789987654321 ⋅ 于 2022-07-13 17:11:34 ⋅ 1173 阅读

Spark_Hudi

Hudi集成hive数据查询重复问题

设置set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;这个参数会影响到普通的hive表查询,
因此在查询完成后,应该设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
或者改为默认值          set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

1.统计滴滴出行指标

file

2.spark工具类

创建sparksql对象

package cn.itcast.hudi.didi

import org.apache.spark.sql.SparkSession

/**
 * SparkSQL 操作数据(加载读取和保存写入)时工具类,比如获取SparkSession实例对象等等
 */
object SparkUtils {

    /**
     * 构建SparkSession实例对象时,默认情况下本地模式运行
     */
    def createSpakSession(clazz: Class[_], master: String = "local[8]", partitions: Int = 4): SparkSession = {
        SparkSession.builder()
            .appName(clazz.getSimpleName.stripSuffix("$"))
            .master(master)
            //设置序列化方式
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            //产生shuffler时候的分区数
            .config("spark.sql.shuffle.partitions", partitions)
            .getOrCreate()
    }

    def main(args: Array[String]): Unit = {
        val spark = createSpakSession(this.getClass)
        println(spark)

        Thread.sleep(10000000)

        spark.stop()
    }
}

3.将日期转换星期工具类

package cn.itcast.hudi.test

import java.util.{Calendar, Date}
import org.apache.commons.lang3.time.FastDateFormat

/**
 * 将日期转换星期,例如输入:2021-10-10 -> 星期日
 */
object DayWeekTest {

    def main(args: Array[String]): Unit = {

        val dateStr: String = "2021-09-10"

        val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd")
        val calendar: Calendar = Calendar.getInstance();

        val date: Date = format.parse(dateStr)
        calendar.setTime(date)

        val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match {
            case 1 => "星期日"
            case 2 => "星期一"
            case 3 => "星期二"
            case 4 => "星期三"
            case 5 => "星期四"
            case 6 => "星期五"
            case 7 => "星期六"
        }
        println(dayWeek)
    }
}

4.数据ETL保存代码

4.1 构建SparkSession实例对象(集成Hudi和HDFS)

val spark: SparkSession = SparkUtils.createSpakSession(this.getClass)

4.2 加载本地CSV文件格式滴滴出行数据

val didiDF = readCsvFile(spark, datasPath)
didiDF.printSchema()
didiDF.show(10, truncate = false)

/**
 * 读取CSV格式文本文件数据,封装到DataFrame中
 */
def readCsvFile(spark: SparkSession, path: String): DataFrame = {
    spark.read
        // 设置分隔符为制表符
        .option("sep", "\\t")
        // 文件首行为列名称
        .option("header", "true")
        // 依据数值自动推断数据类型
        .option("inferSchema", "true")
        // 指定文件路径
        .csv(path)
}   

4.3滴滴出行数据ETL处理

dataframe.withColumn 添加列

val etlDF: DataFrame = process(didiDF)
etlDF.printSchema()
etlDF.show(10, truncate = false)    

/**
 * 对滴滴出行海口数据进行ETL转换操作:指定ts和partitionpath列
 */
def process(dataframe: DataFrame): DataFrame = {
    dataframe
        // 添加字段,就是Hudi表分区字段,三级分区 -> yyyy-MM-dd
        .withColumn(
            "partitionpath",
            concat_ws("-", col("year"), col("month"), col("day"))
        )
        // 删除列
        .drop("year", "month", "day")
        // 添加timestamp列,作为Hudi表记录数据合并时字段,使用发车时间
        .withColumn(
            "ts",
            unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss")
        )
}    

4.4保存转换后数据至Hudi表

saveToHudi(etlDF, hudiTableName, hudiTablePath)

def saveToHudi(dataframe: DataFrame, table: String, path: String): Unit = {
        // 导入包
        import org.apache.hudi.DataSourceWriteOptions._
        import org.apache.hudi.config.HoodieWriteConfig._

        // 保存数据
        dataframe.write
            .mode(SaveMode.Overwrite)
            .format("hudi")
            //分区并行度
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            // Hudi 表的属性值设置
            .option(RECORDKEY_FIELD.key(), "order_id")
            .option(PRECOMBINE_FIELD.key(), "ts")
            //分区字段
            .option(PARTITIONPATH_FIELD.key(), "partitionpath")
            .option(TBL_NAME.key(), table)
            .save(path)
    }    

5.指标查询分析代码

//缓存
hudiDF.persist(StorageLevel.MEMORY_AND_DISK)
// 当数据不在使用时,释放资源
hudiDF.unpersist()

主类代码

def main(args: Array[String]): Unit = {
        // step1、构建SparkSession实例对象(集成Hudi和HDFS)
        val spark: SparkSession = SparkUtils.createSpakSession(this.getClass, partitions = 8)

        // step2、加载Hudi表的数据,指定字段
        val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath)
//      hudiDF.printSchema()
//      hudiDF.show(10, truncate = false)

        // 由于数据被使用多次,所以建议缓存
        hudiDF.persist(StorageLevel.MEMORY_AND_DISK)

        // step3、按照业务指标进行统计分析
        // 指标1:订单类型统计
        reportProduct(hudiDF)

        // 指标2:订单时效统计
        reportType(hudiDF)

        // 指标3:交通类型统计
        reportTraffic(hudiDF)

        // 指标4:订单价格统计
        reportPrice(hudiDF)

        // 指标5:订单距离统计
        reportDistance(hudiDF)

        // 指标6:日期类型 -> 星期,进行统计
        reportWeek(hudiDF)

        // 当数据不在使用时,释放资源
        hudiDF.unpersist()

        // step4、应用结束,关闭资源
        spark.stop()
    }

5.1订单类型统计


    /**
     * 订单类型统计,字段:product_id
     */
    def reportProduct(dataframe: DataFrame): Unit = {
        // a. 按照产品线ID分组统计即可
        val reportDF: DataFrame = dataframe.groupBy("product_id").count()

        // b. 自定义UDF函数,转换名称
        val to_name = udf(
            (productId: Int) => {
                productId match {
                    case 1 => "滴滴专车"
                    case 2 => "滴滴企业专车"
                    case 3 => "滴滴快车"
                    case 4 => "滴滴企业快车"
                }
            }
        )

        // c. 转换名称
        val resultDF: DataFrame = reportDF.select(
            to_name(col("product_id")).as("order_type"),
            col("count").as("total")
        )
        resultDF.printSchema()
        resultDF.show(10, truncate = false)
    }   

5.2订单时效统计

/**
     * 订单时效性统计,字段:type
     */
    def reportType(dataframe: DataFrame): Unit = {
        // a. 按照时效性id分组统计即可
        val reportDF: DataFrame = dataframe.groupBy("type").count()

        // b. 自定义UDF函数,转换名称
        val to_name = udf(
            (realtimeType: Int) => {
                realtimeType match {
                    case 0 => "实时"
                    case 1 => "预约"
                }
            }
        )

        // c. 转换名称
        val resultDF: DataFrame = reportDF.select(
            to_name(col("type")).as("order_realtime"),
            col("count").as("total")
        )
        resultDF.printSchema()
        resultDF.show(10, truncate = false)
    }

5.3交通类型统计

    /**
     * 交通类型统计,字段:traffic_type
     */
    def reportTraffic(dataframe: DataFrame): Unit = {
        // a. 按照交通类型id分组统计即可
        val reportDF: DataFrame = dataframe.groupBy("traffic_type").count()

        // b. 自定义UDF函数,转换名称
        val to_name = udf(
            (trafficType: Int) => {
                trafficType match {
                    case 0 =>  "普通散客"
                    case 1 =>  "企业时租"
                    case 2 =>  "企业接机套餐"
                    case 3 =>  "企业送机套餐"
                    case 4 =>  "拼车"
                    case 5 =>  "接机"
                    case 6 =>  "送机"
                    case 302 =>  "跨城拼车"
                    case _ => "未知"
                }
            }
        )

        // c. 转换名称
        val resultDF: DataFrame = reportDF.select(
            to_name(col("traffic_type")).as("traffic_type"),
            col("count").as("total")
        )
        resultDF.printSchema()
        resultDF.show(10, truncate = false)
    }

5.4订单价格统计

    /**
     * 订单价格统计,先将订单价格划分阶段,再统计各个阶段数目,使用字段:pre_total_fee
     */
    def reportPrice(dataframe: DataFrame): Unit = {
        val resultDF: DataFrame = dataframe
            //先聚合,然后通过sum ,when (统计的字段).between(最小值,最大值),满足条件则计数1,否则计数0
            .agg(
                // 价格 0 ~ 15
                sum(
                    when(col("pre_total_fee").between(0, 15), 1).otherwise(0)
                ).as("0~15"),
                // 价格 16 ~ 30
                sum(
                    when(col("pre_total_fee").between(16, 30), 1).otherwise(0)
                ).as("16~30"),
                // 价格 31 ~ 50
                sum(
                    when(col("pre_total_fee").between(31, 50), 1).otherwise(0)
                ).as("31~50"),
                // 价格 51 ~ 100
                sum(
                    when(col("pre_total_fee").between(51, 100), 1).otherwise(0)
                ).as("51~100"),
                // 价格 100+
                sum(
                    when(col("pre_total_fee").gt(100), 1).otherwise(0)
                ).as("100+")
            )
        resultDF.printSchema()
        resultDF.show(10, truncate = false)
    }

5.5订单距离统计

/**
     * 订单距离统计,先将订单距离划分为不同区间,再统计各个区间数目,使用字段:start_dest_distance
     */
    def reportDistance(dataframe: DataFrame): Unit = {
        val resultDF: DataFrame = dataframe
            .agg(
                // 距离: 0 ~ 10km
                sum(
                    when(col("start_dest_distance").between(0, 10000), 1).otherwise(0)
                ).as("0~10km"),
                // 距离: 10 ~ 20km
                sum(
                    when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0)
                ).as("10~20km"),
                // 距离: 20 ~ 20km
                sum(
                    when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0)
                ).as("20~30"),
                // 距离: 30 ~ 50km
                sum(
                    when(col("start_dest_distance").between(30001, 50000), 1).otherwise(0)
                ).as("30~50km"),
                // 距离: 50km+
                sum(
                    when(col("start_dest_distance").gt(50001), 1).otherwise(0)
                ).as("50+km")
            )
        resultDF.printSchema()
        resultDF.show(10, truncate = false)
    }

5.6日期类型 -> 星期,进行统计

    /**
     * 订单星期分组统计,先将日期转换为星期,再对星期分组统计,使用字段:departure_time
     */
    def reportWeek(dataframe: DataFrame): Unit = {
        // a. 自定义UDF函数,转换日期为星期
        val to_week: UserDefinedFunction = udf(
            (dateStr: String) => {
                val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
                val calendar: Calendar = Calendar.getInstance();

                val date: Date = format.parse(dateStr)
                calendar.setTime(date)

                val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match {
                    case 1 => "星期日"
                    case 2 => "星期一"
                    case 3 => "星期二"
                    case 4 => "星期三"
                    case 5 => "星期四"
                    case 6 => "星期五"
                    case 7 => "星期六"
                }
                // 返回星期即可
                dayWeek
            }
        )

        // b. 对数据处理,使用udf函数
        val reportDF: DataFrame = dataframe
            .select(
                to_week(col("departure_time")).as("week")
            )
            .groupBy("week").count()
            .select(
                col("week"), col("count").as("total")
            )
        reportDF.printSchema()
        reportDF.show(10, truncate = false)
    }

6.hudi集成hive

环境准备

在Hive中创建表关联至Hudi表,将集成JAR包:hudi-hadoop-mr-bundle-0.9.0.jar,放入至$HIVE_HOME/lib目录下
重启hiveserver2

6.1hive外表映射hudi

--需要创建hive外部分区表

-- 1. 创建数据库database
CREATE DATABASE IF NOT EXISTS db_hudi ;

-- 2. 使用数据库
USE db_hudi ;

-- 3. 创建表
CREATE EXTERNAL TABLE IF NOT EXISTS tbl_hudi_didi(
       order_id bigint          ,
       product_id int           ,
       city_id int              ,
       district int             ,
       county int               ,
       type int                 ,
       combo_type int           ,
       traffic_type int         ,
       passenger_count int      ,
       driver_product_id int    ,
       start_dest_distance int  ,
       arrive_time string       ,
       departure_time string    ,
       pre_total_fee double     ,
       normal_time string       ,
       bubble_trace_id string   ,
       product_1level int       ,
       dest_lng double          ,
       dest_lat double          ,
       starting_lng double      ,
       starting_lat double      ,
       partitionpath string     ,
       ts bigint
)
    PARTITIONED BY (date_str string)
    ROW FORMAT SERDE
        'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
        'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT
        'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION '/hudi-warehouse/tbl_didi_haikou' ;

-- 查看分区表分区
SHOW PARTITIONS db_hudi.tbl_hudi_didi ;

-- 5. 手动添加分区信息
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-22') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-22' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-23') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-23' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-24') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-24' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-25') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-25' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-26') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-26' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-27') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-27' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-28') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-28' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-29') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-29' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-30') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-30' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-5-31') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-5-31' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-1') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-1' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-2') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-2' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-3') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-3' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-4') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-4' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-5') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-5' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-6') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-6' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-7') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-7' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-8') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-8' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-9') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-9' ;
ALTER TABLE db_hudi.tbl_hudi_didi ADD IF NOT EXISTS PARTITION (date_str = '2017-6-10') LOCATION  '/hudi-warehouse/tbl_didi_haikou/2017-6-10' ;

-- 测试,查询数据
SET hive.mapred.mode = nonstrict ;
SELECT order_id, product_id, type, pre_total_fee, traffic_type, start_dest_distance FROM db_hudi.tbl_hudi_didi LIMIT 20;

6.2查看hive数据

-- 开发测试,设置运行模式为本地模式
set hive.exec.mode.local.auto=true;

set hive.exec.mode.local.auto.tasks.max=10;
set hive.exec.mode.local.auto.inputbytes.max=50000000;

-- 指标一:订单类型统计
WITH tmp AS (
    SELECT product_id, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY product_id
)
SELECT
    CASE product_id
        WHEN 1 THEN "滴滴专车"
        WHEN 2 THEN "滴滴企业专车"
        WHEN 3 THEN "滴滴快车"
        WHEN 4 THEN "滴滴企业快车"
        END AS order_type,
    total
FROM tmp ;

-- 指标二:订单时效性统计
WITH tmp AS (
    SELECT type, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY type
)
SELECT
    CASE type
        WHEN 0 THEN "实时"
        WHEN 1 THEN "预约"
        END AS order_type,
    total
FROM tmp ;

-- 指标三:订单交通类型统计
SELECT traffic_type, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY traffic_type ;

-- 指标五:订单价格统计,先将价格划分区间,再统计,此处使用 WHEN函数和SUM函数
SELECT
    SUM(
            CASE WHEN pre_total_fee BETWEEN 0 AND 15 THEN 1 ELSE 0 END
        ) AS 0_15,
    SUM(
            CASE WHEN pre_total_fee BETWEEN 16 AND 30 THEN 1 ELSE 0 END
        ) AS 16_30,
    SUM(
            CASE WHEN pre_total_fee BETWEEN 31 AND 50 THEN 1 ELSE 0 END
        ) AS 31_50,
    SUM(
            CASE WHEN pre_total_fee BETWEEN 50 AND 100 THEN 1 ELSE 0 END
        ) AS 51_100,
    SUM(
            CASE WHEN pre_total_fee > 100 THEN 1 ELSE 0 END
        ) AS 100_
FROM
    db_hudi.tbl_hudi_didi ;

7.structedstreaming写入Hudi()

参数官网说明 https://hudi.apache.org/docs/writing_data#datasource-writer

package cn.itcast.hudi.stream

import cn.itcast.hudi.didi.SparkUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}

/**
 * 基于StructuredStreaming结构化流实时从Kafka消费数据,经过ETL转换后,存储至Hudi表
 */
object HudiStructuredDemo {

    /**
     * 指定Kafka Topic名称,实时消费数据
     */
    def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "node1.itcast.cn:9092")
            .option("subscribe", topicName)
            .option("startingOffsets", "latest")
            .option("maxOffsetsPerTrigger", 100000) //每次最多处理数据数量
            .option("failOnDataLoss", "false")//处理失败是否停止
            .load()
    }

    /**
     * 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存Hudi表中
     */
    def process(streamDF: DataFrame): DataFrame = {
        streamDF
            // 选择字段
            .selectExpr(
                "CAST(key AS STRING) order_id",
                "CAST(value AS STRING) AS message",
                "topic", "partition", "offset", "timestamp"
            )
            // 解析Message数据,提取字段值
            .withColumn("user_id", get_json_object(col("message"), "$.userId"))
            .withColumn("order_time", get_json_object(col("message"), "$.orderTime"))
            .withColumn("ip", get_json_object(col("message"), "$.ip"))
            .withColumn("order_money", get_json_object(col("message"), "$.orderMoney"))
            .withColumn("order_status", get_json_object(col("message"), "$.orderStatus"))
            // 删除message字段
            .drop(col("message"))
            // 转换订单日期时间格式为Long类型,作为hudi表中合并数据字段
            .withColumn("ts", to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss.SSS"))
            // 订单日期时间提取分区日志:yyyyMMdd
            .withColumn("day", substring(col("order_time"), 0, 10))
    }

    /**
     * 将流式数据DataFrame保存到Hudi表中
     */
    def saveToHudi(streamDF: DataFrame): Unit = {
        streamDF.writeStream
            .outputMode(OutputMode.Append())
            .queryName("query-hudi-streaming")
            .foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
                println(s"============== BatchId: ${batchId} start ==============")
                import org.apache.hudi.DataSourceWriteOptions._
                import org.apache.hudi.config.HoodieWriteConfig._
                import org.apache.hudi.keygen.constant.KeyGeneratorOptions._

                batchDF.write
                    .mode(SaveMode.Append)
                    .format("hudi")
                    .option("hoodie.insert.shuffle.parallelism", "2")
                    .option("hoodie.upsert.shuffle.parallelism", "2")
                    // Hudi 表的属性值设置
                    .option(RECORDKEY_FIELD.key(), "order_id") //主键
                    .option(PRECOMBINE_FIELD.key(), "ts") //合并数据字段
                    .option(PARTITIONPATH_FIELD.key(), "day") //分区字段
                    .option(TBL_NAME.key(), "tbl_hudi_order") //表名
                    .option(TABLE_TYPE.key(), "MERGE_ON_READ") // 设置表类型
                    // 分区值对应目录格式,与Hive分区策略一致 需要导包
                    //设置hudi分区与hive分区一致
                    .option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
                    .save("/hudi-warehouse/tbl_hudi_order")
            })
            .option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001")
            .start()
    }

    def main(args: Array[String]): Unit = {
        // step1、构建SparkSession实例对象
        val spark: SparkSession = SparkUtils.createSpakSession(this.getClass)

        //step2、从Kafka实时消费数据
        val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic")

        // step3、提取数据,转换数据类型
        val streamDF: DataFrame = process(kafkaStreamDF)

        // step4、保存数据至Hudi表中:MOR类型表,读取表数据合并文件
        saveToHudi(streamDF)

        // step5、流式应用启动以后,等待终止
        spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running .........."))
        spark.streams.awaitAnyTermination()
    }
}

8.SparkShell 查询分析

9.DeltaStreamer 工具类

从Kafka单次摄取新事件
支持json、avro或自定义记录类型的传入数据
管理检查点,回滚和恢复
利用DFS或Confluent schema注册表的Avro模式
支持自定义转换操作
//官方地址  https://hudi.apache.org/docs/hoodie_deltastreamer#deltastreamer
//官方提供案例:实时消费Kafka中数据,数据格式为Avro,将其存储到Hudi表。

10.sparksql-hudi

10.1启动

#hudi0.9支持
#  https://hudi.apache.org/docs/quick-start-guide
--启动spark-sql交互式命令行,设置依赖jar包和相关属性参数。
--conf 'spark.sql.extensions=org.apache.sql.hudi.HoodieSparkSessionExtension'
--Hudi默认upsert/insert/delete的并发度是1500,对于演示小规模数据集设置更小的并发度。
set hoodie.upsert.shuffle.parallelism = 1;
set hoodie.insert.shuffle.parallelism = 1;
set hoodie.delete.shuffle.parallelism = 1;
--设置不同步Hudi表元数据:
set hddoie.datasource/meta.sync.enable=false;

10.2使用

编写DDL语句,创建Hudi表,表的类型:MOR和分区表,主键为id,分区字段为dt,合并字段默认为ts。

10.3sparksql创建DDL 创建表

https://hudi.apache.org/docs/table_management

10.3.1创建COW类型Hudi表

-- create a managed cow table
create table if not exists hudi_table0 (
  id int, 
  name string, 
  price double
) using hudi
options (
  type = 'cow',
  primaryKey = 'id'
);

10.3.2创建表时,指定location存储路径,表就是外部表

-- create an external mor table
create table if not exists hudi_table1 (
  id int, 
  name string, 
  price double,
  ts bigint
) using hudi
options (
  type = 'mor',
  primaryKey = 'id,name',
  preCombineField = 'ts' 
);

10.3.3创建表时设置为分区表:partitioned table

create table if not exists hudi_table_p0 (
id bigint,
name string,
dt string,
hh string  
) using hudi
options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 ) 
partitioned by (dt, hh);

10.3.4支持使用CTAS:Create table as select方式创建表

create table h3 using hudi
as
select 1 as id, 'a1' as name, 10 as price;

11.MergeInto 语句

合并语法
--依据判断条件,决定对数据操作时,属于插入insert、更新update,还是删除delete
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75868
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter