structured-streaming概念和数据源
1.介绍
structured-streaming是基于Spark SQL引擎构建的可扩展和容错流处理引擎。能够以对静态数据表示批处理计算的方式来表示流计算。Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。可以使用Scala、Java、Python或R中的Dataset/DataFrame API来处理流聚合、事件时间窗口、流到批连接等。计算在同一个优化的Spark SQL引擎上执行。最后,系统通过检查点和预写日志确保端到端一次容错保证。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次流处理,用户无需考虑流。
默认情况下,结构化流查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现低至100毫秒的端到端延迟和精确的一次容错保证。然而,自Spark 2.3以来,我们引入了一种新的低延迟处理模式,称为Continuous processing,它可以实现低至1毫秒的端到端延迟,并至少保证一次。在不更改查询中的数据集/数据帧操作的情况下,您将能够根据应用程序要求选择模式。
首先,让我们从结构化流查询的一个简单示例开始——流单词计数。
2.快速案例
假设我们从tcp接受数据并且对数据进行计算。让我们看看如何使用structured-streaming来处理。
首先导入依赖包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
具体实现如下:
构建session对象引入隐式转换
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
接受文本数据,监听6666端口,并且转换为dataFrame对象进行sql计算
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
这行DataFrame表示包含流文本数据的无边界表。此表包含一列名为“value”的字符串,流式文本数据中的每一行都成为表中的一行。注意,这当前没有接收到任何数据,因为我们只是在设置转换,还没有开始转换。接下来,我们使用.as[String]将DataFrame转换为String的Dataset,以便我们可以应用flatMap操作将每一行拆分为多个字。结果单词“数据集”包含所有单词。最后,我们通过对数据集中的唯一值进行分组并对其进行计数,定义了wordCounts DataFrame。注意,这是一个流数据,它表示的是流式的wordcount。
最后设置输出到控制台打印数据
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
整体代码如下:
package com.hainiu.spark.structured_streaming
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object Test1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("test structure")
.master("local[*]")
.getOrCreate()
import spark.implicits._
//无界的流,产生的dataFrame对象
val frame: DataFrame = spark.readStream.format("socket")
.option("host", "op.hadoop")
.option("port", "6666")
.load()
//as[String]转换为字符串类型的DataSet[String]
//遍历每个元素将数据按照空格进行分割
val words : Dataset[String] = frame.as[String].flatMap(_.split(" "))
val wordcounts = words.groupBy("value").count()
//读取的数据如果是一个字段那么默认它的列字段为value
val query: StreamingQuery = wordcounts.writeStream.outputMode("complete").format("console").start()
//输出的模式是complete模式,为全量模式
//console代表的是输出的数据的位置
query.awaitTermination()
}
}
当然我们也可以运行系统提供的样例进行测试
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount nn1 6666
在nn2上面执行nc命令模拟tcp协议
# 如果没有安装nc --> yum -y install nc
nc -l -k -p 6666
# 输入内容
apache spark
apache hadoop
执行完毕的结果可以展示出来
3.基本概念
输入表
输入的input table的表,这个表作为输入数据,每条数据都会以追加的形式放入到输入表中,其中这个表叫做无界流的表,数据会不停的插入到表中
输出表
输出的表的数据是输入的数据触发计算产生的,在间隔的时间内进行触发,比如1s钟,输入的数据会追加到输入表,产生的结果会更新结果表,无论什么时候更新数据,处理完毕的结果都想要追加到结果表中
4.输出模式
-
Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
全量的数据输出到外部存储系统,然后根据外部存储连接器按照需要进行存储,但是必须是聚合的数据结果才能使用complete模式
-
Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
-
只有新的数据才能追加到外部系统,这些数据基本都是追加到外部系统一次性存储不会变化的数据,有聚合的时候必须存在水位线才可以
-
Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
- 在spark2.1.1以后出现的,更新模式只输出新出发计算的更新结果,如果没有聚合操作那么这个update模式和append模式相同
代码演示
package com.hainiu.spark.structured_streaming
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object Test1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("test structure")
.config("spark.sql.shuffle.partitions",2)
.master("local[*]")
.getOrCreate()
import spark.implicits._
val frame: DataFrame = spark.readStream.format("socket")
.option("host", "op.hadoop")
.option("port", "6666")
.load()
val words : Dataset[String] = frame.as[String].flatMap(_.split(" "))
val wordcounts = words.groupBy("value").count()
// val query1: StreamingQuery = wordcounts.writeStream.outputMode("complete").format("console").start()
// val query2: StreamingQuery = wordcounts.writeStream.outputMode("append").format("console").start()
// val query3: StreamingQuery = wordcounts.writeStream.outputMode("complete").format("console").start()
// query1.awaitTermination()
// query2.awaitTermination()
// query3.awaitTermination()
val query = words.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
}
}
修改代码对不同的情况进行演示
分别在nc中输入
hello world
hello world
hello
world
在聚合的情况下得到结果如下:
val query1: StreamingQuery = wordcounts.writeStream.outputMode("complete").format("console").start()
val query2: StreamingQuery = wordcounts.writeStream.outputMode("update").format("console").start()
val query3: StreamingQuery = wordcounts.writeStream.outputMode("append").format("console").start()
追击模式必须存在聚合
没有聚合的情况下,代码如下
val query = words.writeStream.outputMode("complete").format("console").start()
query.awaitTermination();
complete模式中必须存在聚合
val query = words.writeStream.outputMode("update").format("console").start()
query.awaitTermination();
数据追加到结果端,只更新最新到来的数据
val query = words.writeStream.outputMode("append").format("console").start()
query.awaitTermination();
在没有聚合的情况下和update是一样的
5.案例解释
为了说明该模型的使用,让我们在上面的快速示例中了解该模型。第一行DataFrame是输入表,最后一个wordCounts DataFrame是结果表。请注意,用于生成wordCount的流式线DataFrame上的查询与静态DataFrame完全相同。然而,当启动此查询时,Spark将持续检查套接字连接中的新数据。如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据相结合,以计算更新的计数,如下所示。
请注意,结构化流式处理不会实现整个表。它从流数据源读取最新的可用数据,对其进行增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如前面示例中的中间计数)。
该模型与许多其他流处理引擎显著不同。许多流媒体系统要求用户自己维护正在运行的聚合,因此必须考虑容错性和数据一致性(至少一次、最多一次或准确一次)。在这个模型中,Spark负责在有新数据时更新结果表,从而避免用户对其进行推理。
6.事件时间和延迟数据
事件时间是嵌入数据本身的时间。对于许多应用程序,可能希望在此事件时间进行操作。例如,如果希望获得IoT设备每分钟生成的事件数,那么可能希望使用数据生成的时间(即数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中表达得非常自然——来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如,每分钟的事件数)只是事件时间列上的一种特殊类型的分组和聚合–每个时间窗口都是一个组,每行可以属于多个窗口/组。因此,可以在静态数据集(例如,从收集的设备事件日志)和数据流上一致地定义这种基于事件时间窗口的聚合查询,从而使用户的开发更加轻松。
此外,该模型自然地根据事件时间处理比预期晚到达的数据。由于Spark正在更新Result Table,因此它可以在有延迟数据时完全控制更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。自Spark 2.1以来,我们支持watermark,允许用户指定后期数据的阈值,并允许引擎相应地清理旧状态。稍后将在“窗口操作”一节中更详细地解释这些操作。
7.容错
提供端到端一次语义是结构化流设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源、接收器和执行引擎,以可靠地跟踪处理的确切进度,从而可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于Kafka偏移量或Kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中处理的数据的偏移量范围。流接收器被设计为幂等的,用于处理再处理。通过使用可重放源和幂等汇,结构化流可以确保在任何故障情况下端到端的一次语义。
8.DataFrame和DataSet的api
自Spark 2.0以来,DataFrame和Dataset可以表示静态的有界数据,也可以表示流式的无界数据。与Datasets/DataFrames相同,可以使用公共入口点SparkSession(Scala/Java/Python/R docs)从流源创建Datasets/DataFrames,并对其应用与静态Datasets/DataFrames相同的操作。
9.数据源
dataframe或者是dataset的创建可以通过SparkSession.readStream()进行,并且他们的功能都是对DataStreamReader接口的实现
spark自带的数据源分为以下几种
- File source 文本数据源可以读取文件夹中的文件,处理文件夹中的数据根据文本的修改时间进行顺序处理,如果文本被修改那么处理顺序也会变化,并且支持的数据格式化方式有csv,json,orc,parquet等,注意,文件必须原子地放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作来实现,不能一行一行的写入数据
- Kafka source - kafka数据源,支持0.10以上版本的kafka
- Socket source 读取tcp的数据主要作为测试,注意:这个数据源不能够保证数据的端对端的一致性
- Rate source 以每秒指定的行数生成数据,每个输出行都包含时间戳和值。其中时间戳是包含消息发送时间的时间戳类型,值是包含消息计数的Long类型,从0开始作为第一行。此源用于测试和基准测试
有些源不具有容错性,因为它们不能保证在发生故障后可以使用检查点偏移量断点消费数据
源端 | 选项 | 容错 | 注意事项 |
File source | path :输入目录的路径maxFilesPerTrigger :每个触发器中要考虑的新文件的最大数量(默认值:no max)latestFirst :是否首先处理最新的新文件,在文件大量积压时非常有用(默认值为false)fileNameOnly :是否仅根据文件名而不是完整路径检查新文件(默认值是false)设置为“true”时“file:///dataset.txt“”s3://a/dataset.txt“为同一个路径maxFileAge 最大读取数据时长,超过时长会被删除。如果latestFirst 设置为true 并且maxFilesPerTrigger 被设置,则将忽略此参数(默认值:1周)cleanSource :处理后清理已完成文件的选项。可用选项包括“存档”、“删除”和“关闭”,默认值为“off”,选择归档的时候的设定归档路径“sourceArchiveDir”,一般归档或者是删除都会增加系统开销,所以设置这个参数时候要谨慎,有关读取数据操作配置,请查看相关选项,例如,有关“parquet”格式选项,请参见`DataStreamReader.parquet() |
Yes | 支持整体路径但是不可以是都好分割的多个路径 |
Socket Source | socket协议的ip和端口号 | No | |
Rate Source | rampUpTime 到达指定速率的间隔时间rowsPerSecond 每秒多少条numPartitions 分区数量 |
Yes | |
Kafka Source | 参照Kafka Integration Guide. | Yes |
不同源端的演示
9.0 Socket的用法
package com.hainiu.spark
import org.apache.spark.sql.SparkSession
object TestSource {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local[*]").appName("test source").getOrCreate()
import session.implicits._
val df = session.readStream
.format("socket")
.option("host", "nn1")
.option("port", 6666)
.load()
val ds = df.as[String].map(t => {
val strs = t.split(" ")
(strs(0), strs(1), strs(2))
}).toDF("id", "name", "age")
ds.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
}
}
9.1 读取text文件
首先在项目中创建data文件,创建student.txt文件输入如下值:
1,zhangsan,20
2,lisi,30
3,wangwu,40
整体代码如下:
package com.hainiu.structure
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 读取text文件
*/
object TestSource {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.shuffle.partitions",2) //设置少量分区数,默认分区格式200个
.appName("test read").master("local[*]").getOrCreate()
val df: DataFrame = spark.readStream.text("data")
//输入路径必须是文件夹
// val df: DataFrame = spark.readStream.format("text").load("data/student.txt")
//参数配置信息可以人为加入,text文本读取默认只有一列value的字段
//recursiveFileLookup 是否递归查询数据
//pathGlobFilter 路径过滤器
//lineSep 行分隔符
//wholetext 是否将一个文件作为一个整行
//maxFilesPerTrigger 每次触发读取的文件个数
import spark.implicits._
val ds: Dataset[String] = df.as[String] //转换为ds
val query = ds.map(t => {
val strs = t.split(",") //处理数据
(strs(0).toInt, strs(1), strs(2).toInt)
}).toDF("id", "name", "age")
.writeStream.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
9.2读取csv文件
创建student.csv
1,zhangsan,20
2,lisi,30
3,wangwu,40
整体代码:
package com.hainiu.structure
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 读取text文件
*/
object TestSource {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.shuffle.partitions",2) //设置少量分区数,默认分区格式200个
.appName("test read").master("local[*]").getOrCreate()
//由于数据是csv的格式,可以存在字段,所以配置表的格式至关重要
val schema = new StructType().add("id", IntegerType).add("name", StringType).add("age", IntegerType)
val df: DataFrame = spark.readStream.schema(schema)
.option("sep",",")
.csv("data")
//参数配置
//header 将第一行作为表头
//emptyValue 设定字符串空值默认值
//nanValue 设定数值空值默认值
//dateFormat 日期格式化
//sep 数据分割符号
//mode 脏数据处理方式 PERMISSIVE遇见损坏配置默认值columnNameOfCorruptRecord由这个参数配置
//DROPMALFORMED 跳过 FAILFAST 抛出异常
df.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
9.3 读取json文件
创建data/student.json
{"id":1,"name":"zhangsan","age":20}
{"id":2,"name":"lisi","age":30}
{"id":3,"name":"wangwu","age":40}
整体代码如下:
package com.hainiu.structure
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 读取text文件
*/
object TestSource {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.shuffle.partitions",2) //设置少量分区数,默认分区格式200个
.appName("test read").master("local[*]").getOrCreate()
val schema = new StructType().add("id", IntegerType).add("name", StringType).add("age", IntegerType)
//虽然json中也存在字段,也需要指定schema
val df: DataFrame = spark.readStream
.schema(schema)
.json("data")
//json文件自带格式信息不用加分隔符
df.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
以上都是文本文件,这些文件我们可以手动创建,但是orc和parquet文件是不能够手动创建的,这个需要我们存储完毕以后再读取演示。
structure streaming是不支持jdbc读取数据的
9.4 文件分区读取
如果在一个文件夹中存在子文件夹,并且文件夹的名称为 name=value的形式,可以在表中自动形成分区
# 在data文件夹中创建student文件夹
# 并且创建子文件夹 year=2023
# 并且创建子文件夹 year=2022
# 并且在这些文件夹下面也创建子文件夹
# year=2022/month=01
# year=2022/month=02
# year=2023/month=01
# year=2023/month=02
# 然后分别创建文件
# year=2022/month=01/s1.csv
# year=2022/month=02/s2.csv
# year=2023/month=01/s3.csv
# year=2023/month=02/s4.csv
# 文件内容如下:
# year=2022/month=01/s1.csv
1,zhangsan,20
# year=2022/month=02/s2.csv
2,lisi,30
# year=2023/month=01/s3.csv
3,wangwu,40
# year=2023/month=02/s4.csv
4,zhaosi,35
整体代码如下:
package com.hainiu.structure
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 读取text文件
*/
object TestSource {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.shuffle.partitions",2) //设置少量分区数,默认分区格式200个
.appName("test read").master("local[*]").getOrCreate()
val schema = new StructType().add("id", IntegerType).add("name", StringType).add("age", IntegerType)
val df: DataFrame = spark.readStream
.schema(schema)
.csv("data/student")
df.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
文件结构如下:
输入结果如下
9.5 自带rate方式数据源
生成固定格式的数据,时间戳和value值字段两个,主要用于数据做测试
package com.hainiu.structure
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 读取text文件
*/
object TestSource {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.shuffle.partitions",2) //设置少量分区数,默认分区格式200个
.appName("test read")
.master("local[*]")
.getOrCreate()
val df: DataFrame = spark.readStream
.format("rate") // 设置数据源为 rate
.option("rowsPerSecond", 10) // 设置每秒产生的数据的条数, 默认是 1
.option("rampUpTime", 0) // 设置多少秒到达指定速率 默认为 0
.option("numPartitions", 2) /// 设置分区数 默认是 spark 的默认并行度
.load
df.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
9.6 kafka数据源
首先引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
环境准备
启动完毕准备topic
kafka-topics.sh --bootstrap-server kafka1-89796:9092 --create --topic topic_hainiu --partitions 3 --replication-factor 2
kafka读取的数据内容如下:
字段 | 类型 |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (optional) | array |
源端需要设定的参数值:
Option | value |
---|---|
assign | 指定分区消费 |
subscribe | 按照逗号分隔的topic列表 |
subscribePattern | 正则形式的topic指定 |
kafka.bootstrap.servers | 集群地址 |
以下字段可以选择性配置
选项 | 值 | 注释 |
---|---|---|
startingOffsetsByTimestamp | json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | 指定消费时间戳 |
startingOffsets | "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | 指定偏移量位置消费数据 |
endingOffsetsByTimestamp | json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | 时间结束点,批任务 |
endingOffsets | latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | 偏移量结束点,批任务 |
failOnDataLoss | true or false | 数据丢失是否抛出异常 |
kafkaConsumer.pollTimeoutMs | 120000 | 拉取超时时间 |
fetchOffset.numRetries | 3 | 重试次数 |
fetchOffset.retryIntervalMs | 10 | 重试间隔时间 |
kafka.group.id | 组id |
消费者代码:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
整体代码如下:
package com.hainiu.spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object TestSource {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local[*]").appName("test source").getOrCreate()
import session.implicits._
val df = session.readStream
.format("kafka")
.option("kafka.bootstrap.servers","kafka1-89796:9092")
.option("kafka.group.id","hainiu")
.option("subscribe","topic_hainiu")
.option("startingOffsets","latest")
.load()
val ds = df.selectExpr("cast(key as string)", "cast(value as string)")
//1 zhangsan 20
val ds1 = ds.select("value").as[String]
.map(t=>{
val strs = t.split(" ")
(strs(0),strs(1),strs(2))
}).toDF("id","name","age")
ds1.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
}
}
查询出来的kafka的数据都是二进制的数据,这个是kafka默认存储数据的类型,想要使用则需要进行类型的转换
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
但是得到的数据都在value中,我们需要二次处理得到结果
df.selectExpr("cast(value as string)") //只取值value,并且转换为string类型
.as[String] //as方法转换为dataset的数据流,可以直接进行操作数据
.map(t=>{
val strs = t.split(",")
(strs(0).toInt,strs(1),strs(2).toInt)
}).toDF("id","name","age") //转换为df是为了给数据增加字段值
执行结果如上图
可增加配置,展示全部数据并且实现计算周期灵敏度调节
.trigger(Trigger.Continuous(1000)) //触发时间1s一次
.option("truncate",false) //打印全部结果数据
偏移量问题我们后续进行处理,保证kafka数据的断点操作