spark Streaming01
21.1 spark streaming介绍
21.1.1 背景
随着大数据技术的不断发展,人们对于大数据的实时性处理要求也在不断提高,传统的 MapReduce 等批处理框架在某些特定领域,例如实时用户推荐、用户行为分析这些应用场景上逐渐不能满足人们对实时性的需求,因此诞生了一批如 S3、Storm 这样的流式分析、实时计算框架。Spark 由于其内部优秀的调度机制、快速的分布式计算能力,所以能够以极快的速度进行迭代计算。正是由于具有这样的优势,Spark 能够在某些程度上进行实时处理,Spark Streaming 正是构建在此之上的流式框架。
21.1.2 Spark Streaming 设计
支持输入输出的数据源:
Spark Streaming 是 Spark 的核心组件之一,它可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafka、ZeroMQ等消息队列以及TCP sockets或者目录文件从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库或显示在仪表盘里。
Spark Streaming 的基本原理是将实时输入数据流以时间片(通常在0.5\~2秒之间)为单位进行拆分,然后采用 Spark 引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示:
Spark Streaming 最主要的抽象是离散化数据流(DStream),DStream 表示连续不断的数据流。在内部实现上,Spark Streaming 的输入数据按照时间片分成一段一段,每一段数据转换为 Spark 中的 RDD,并且对 DStream 的操作都最终被转变为相应的 RDD 操作。如下图所示:
以 wordcount 为例,一个又一个句子会像流水一样源源不断到达,Spark Streaming 会把数据流按照时间片切分成一段一段,每段形成一个 RDD,这些 RDD 构成了一个 DStream。对这个 DStream 执行 flatMap 操作时,实际上会被转换成针对每个 RDD 的 flatMap 操作,转换得到的每个新的 RDD 又构成了一个新的DStream。如下图所示:
21.1.3 Spark Streaming 与 flink的对比
对比
对比点 | Flink | Spark Streaming |
---|---|---|
实时计算模型 | 纯实时,来一条数据,处理一条数据 | 准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理 |
实时计算延迟度 | 毫秒级 | 秒级 |
吞吐量 | 高 | 高(移动计算不移动数据) |
1)处理模型以及延迟
SparkStreaming 无法实现毫秒级的流计算。Spark Streaming可以在一个短暂的时间窗口里面处理多条(batches)Event,并且 SparkStreaming 将流数据分解为一系列批处理作业,在这个过程中会产生多个spark 作业,每段数据的处理都会经过DAG图分解、任务调度等过程,需要一定的额开销。
2)容错和数据保证
然而两者的都有容错时候的数据保证,Spark Streaming的容错为有状态的计算提供了更好的支持。在Storm中,每条记录在系统的移动过程中都需要被标记跟踪,所以Storm只能保证每条记录最少被处理一次,但是允许从错误状态恢复时被处理多次。这就意味着可变更的状态可能被更新两次从而导致结果不正确。
另一方面,Spark Streaming仅仅需要在批处理级别对记录进行追踪,所以他能保证每个批处理记录仅仅被处理一次,即使是node节点挂掉。
3)批处理框架集成
Spark Streaming的一个很棒的特性就是它是在Spark框架上运行的。这样你就可以使用spark的批处理代码一样来写Spark Streaming程序,或者是在Spark中交互查询比如spark-sql。这就减少了单独编写流处理程序和历史数据处理程序。
4)生产支持
两者都可以在各自的集群框架中运行,但是Storm可以在Mesos上运行, 而Spark Streaming可以在YARN和Mesos上运行。
Storm已经出现好多年了,而且自从2011年开始就在Twitter内部生产环境中使用,还有其他一些公司。
Spark Streaming优缺点
优点:
Spark Streaming的真正优势(Storm绝对比不上的),是它属于Spark生态技术栈中,因此Spark Streaming可以和Spark Core、Spark SQL无缝整合,而这也就意味着,我们可以对实时处理出来的中间数据,立即在程序中无缝进行延迟批处理、交互式查询等操作,这个特点大大增强了Spark Streaming的优势和功能。
缺点:
延迟。500毫秒已经被广泛认为是最小批次大小,这个相对storm来说,还是大很多。所以实际场景中应注意该问题,就像标题分类场景,设定的0.5s一批次,加上处理时间,分类接口会占用1s的响应时间。实时要求高的可选择使用其他框架。
应用场景:
实时性要求高,用storm。
实时性要求不高、在计算过程中需要复杂的转换操作或交互式查询的操作,用 Spark Streaming。
21.2 架构及运行流程
21.2.1 架构
Spark Streaming使用“微批次”的架构,把流试计算当成一系列连接的小规模批处理来对待,Spark Streaming从各种输入源中读取数据,并把数据分成小组的批次,新的批次按均匀的时间间隔创建出来,在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中,在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的,批次间隔一般设在500毫秒到几秒之间,由应用开发者配置,每个输出批次都会形成一个RDD,以Spark作业的方式处理并生成其他的RDD。并能将处理结果按批次的方式传给外部系统。
在本地运行Spark Streaming程序时,请勿使用“ local”或“ local [1]”作为主URL。这两种方式均意味着仅一个线程将用于本地运行任务。如果您使用的是基于接收器的输入DStream(例如套接字,Kafka,Flume等),则将使用单个线程来运行接收器,而不会留下任何线程来处理接收到的数据。因此,在本地运行时,请始终使用“ local [ n ]”作为主URL,其中n >要运行的接收器数量。
21.2.2 运行流程
SparkStreaming 分为Driver端 和 Client端。
Driver端为StreamingContext实例,包括JobScheduler 、DStreamGraph 等;
Client端为 ReceiverSupervisor 和 Receiver。
SparkStreaming 进行流数据处理的大概步骤:
1)启动流处理引擎;
2)接收及存储流数据;
3)处理流数据;
4)输出处理结果;
StreamingContext 初始化时,会初始化JobScheduler 、DStreamGraph 实例。其中:
DStreamGraph:存放DStream 间的依赖关系,就像RDD的依赖关系一样;
JobScheduler:JobScheduler 是SparkStreaming 的 Job 总调度者。它 包括 ReceiverTracker 和 JobGenerator。
ReceiverTacker:它负责启动、管理各个executor的 流数据接收器(Receiver)及管理各个Receiver 接收到的数据。当ReceiverTacker启动过程中,会初始化executor 的 流数据接收管理器(ReceiverSupervisor),再由它启动流数据接收器(Receiver)。
JobGenerator:它是批处理作业生成器,内部维护一个定时器,定时处理批次的数据生成作业。
step2:接收及存储流数据;
当Receiver 启动后,连续不断的接收实时流数据,根据传过来的数据大小进行判断,如果数据小,就攒多条数据成一块,进行块存储;如果数据大,则一条数据成一块,进行块存储。
块存储时会根据是否设置预写日志文件分成两种方式:
1)不设置预写日志文件,就直接写入对应Worker的内存或磁盘。
2)设置预写日志文件,会同时写入对应Worker的内存或磁盘 和 容错文件系统(比如hdfs),设置预写日志文件主要是为了容错,在当前节点出故障后,还可以恢复。
数据存储完毕后,ReceiverSupervisor 会将数据存储的元信息(streamId、数据位置、数据条数、数据 size 等信息)上报给 Driver端的 ReceiverTacker。ReceiverTacker 维护收到的元数据信息。
step3:处理流数据;
在 StreamingContext 的 JobGenerator 中维护一个定时器,该定时器在批处理时间到来时会进行生成作业的操作。在操作中进行如下操作:
1)通知 ReceiverTacker 将接收到到的数据进行提交,在提交时采用 synchronize 关键字进行处理,保证每条数据被划入一个且只被划入一个批次中。
2)要求 DStreamGraph 根据 DStream 依赖关系生成作业序列 Seq[Job]。
3)从 ReceiverTacker 中获取本批次数据的元数据。
4)把批处理时间、作业序列 Seq[Job] 和本批次数据的元数据包装为 JobSet。调用 JobScheduler.submitJobSet(JobSet) 提交给 JobScheduler,JobScheduler 将把这些作业放到 作业队列,Spark 核心 在从作业队列中取出执行作业任务。由于中间有 队列,所以速度非常快。
5)当提交本批次作业结束,根据 是否设置checkpoint,如果设置checkpoint,SparkStreaming 对整个系统做checkpoint。
step4:输出处理结果
由于数据的处理有Spark核心来完成,因此处理的结果会从Spark核心中直接输出至外部系统,如数据库或者文件系统等,同时输出的数据也可以直接被外部系统所使用。由于实时流数据的数据源源不断的流入,Spark会周而复始的进行数据的计算,相应也会持续输出处理结果。
21.3 DStream
21.3.1 DStream 输入源
基本输入源:
文件系统 和 Socket。
高级输入源:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 |
spark-streaming-twitter_2.11 | |
ZeroMQ | spark-streaming-zeromq_2.11 |
MQTT | spark-streaming-mqtt_2.11 |
21.3.2 DStream 转换操作
21.3.2.1 DStream 无状态转换操作
对于DStream 无状态转换操作而言,不会记录历史状态信息,每次对新的批次数据进行处理时,只会记录当前批次数据的状态。
OLDDStream -> NEWDStream
转换 | 描述 |
---|---|
map(func) | 源 DStream的每个元素通过函数func返回一个新的DStream |
flatMap(func) | 类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素 |
filter(func) | 在源DStream上选择func函数返回仅为true的元素,最终返回一个新的DStream |
repartition(numPartitions) | 通过输入的参数numPartitions的值来改变DStream的分区大小 |
union(otherStream) | 返回一个包含源DStream与其他 DStream的元素合并后的新DStream |
count() | 对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam |
reduce(func) | 使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStream。 |
countByValue() | 计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。 |
reduceByKey(func, [numTasks]) | 当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。 |
join(otherStream, [numTasks]) | 当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。 |
cogroup(otherStream, [numTasks]) | 当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。 |
foreachRDD | 获取每个RDD |
transform(func) | 通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。 |
val dstream2 = dstream1.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)
// 通过 transform 将 dstream 转成 rdd, 然后 通过rdd 的 转换进行单词统计,然后把最终的rdd 在转换成 dstream.
val dstream2 = dstream1.transform(rdd =>{
val name = “hehe” // driver端运行
println(s“name:\${name}”) // driver端运行
rddnew = rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)
rddnew
})
在无状态转换算子里面:
transform(func) 是个特殊的算子,它函数内部是 RDD;
而其他的算子内部是对应的元素类型,跟RDD算子里面的类型是一样的。
21.3.2.2 DStream 有状态转换操作
DStream 有状态转换操作包括 滑动窗口转换操作 和 updateStateByKey 操作。
1)滑动窗口转换操作
对于窗口操作,批处理间隔、窗口间隔和滑动间隔是非常重要的三个时间概念,是理解窗口操作的关键所在。
批处理间隔:
在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的,因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。
窗口间隔:
对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。
滑动间隔(slide duration):
它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。在这里必须注意的一点是滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。
滑动间隔 == 窗口间隔,正好不重复,也不漏数据。
滑动间隔 > 窗口间隔,会漏数据。
滑动间隔、窗口间隔 一定是批处理间隔的整数倍。
Spark Streaming 还提供了窗口的计算,它允许你通过滑动窗口对数据进行转换,窗口转换操作如下:
转换 | 描述 |
---|---|
window(windowLength, slideInterval) | 返回一个基于源DStream的窗口批次计算后得到新的DStream。 |
countByWindow(windowLength,slideInterval) | 返回基于滑动窗口的DStream中的元素的数量。 |
reduceByWindow(func, windowLength,slideInterval) | 基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) | 基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。可以进行repartition操作。 |
reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval, [numTasks]) | 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作(func),并对离开窗口的老数据进行“逆向reduce” 操作(invFunc)。但是,只能用于“可逆的reduce函数”必须启用“检查点”才能使用此操作 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。 |
reduceByKeyAndWindow
UpdateStateByKey 原语用于记录历史记录,Word Count 示例中就用到了该特性。若不用 UpdateStateByKey 来更新状态,那么每次数据进来后分析完成,结果输出后将不再保存。如输入:hello world,结果则为:(hello,1)(world,1),然后输入 hello spark,结果则为 (hello,1)(spark,1)。也就是不会保留上一次数据处理的结果。
使用 UpdateStateByKey 原语用于需要记录的 State,可以为任意类型,如上例中即为 Optional类型。
返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。
21.3.3 DStream 输出操作
Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD操作)。以下表列出了目前主要的输出操作:
转换 | 描述 |
---|---|
print() | 在Driver中打印出DStream中数据的前10个元素。 |
saveAsTextFiles(prefix, [suffix]) | 将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) | 将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) | 将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) | 最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。 |
foreachRDD 是个非常强大的输出算子。
它可以把SparkStreaming的程序完全用 rdd的处理方式处理。
// dstream 转换
val dstream2 = dstream1.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)
// 将上面的 SparkStreaming的程序完全用 rdd的处理方式处理
dstream1.foreacheRDD(rdd => {
val rddNew = rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)
rddNew.foreach(打印一波)
})
21.4 SparkStreaming程序
21.4.1 socket 创建DStream
21.4.1.1 scala版
package com.hainiu.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* StreamingContext
* DStream
*/
object TestStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("streaming")
conf.setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
// val sc = ssc.sparkContext
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("nn1", 6666)
val ds1: DStream[String] = ds.flatMap(_.split(" "))
val ds2: DStream[(String, Int)] = ds1.map((_, 1))
val ds3: DStream[(String, Int)] = ds2.reduceByKey(_ + _)
ds3.print()
ssc.start()
ssc.awaitTermination()
}
}
a 程序测试
需要 使用nc 命令来启动Socket,作为server端;而sparkStreaming程序作为 client 端。
linux 使用:
1)安装nc:yum -y install nc
2)执行命令启动Socket服务端: nc -l -k -p 6666
其中:-l:代表启动监听模式,也就是作为socket服务端; -p:监听的端口; -k:多次监听
windows 使用ncat.exe 工具 :
启动SparkStreaming程序,作为客户端,在server端输入数据,客户端监听到就开始运算
程序计算结果
b 查看 webui
总结点:如果运行的整体的时长大于调度时间那么就会出现反压,
sparkStreaming中反压的原理就是接受的效率会自动下调
c 用 transform 改写
上面的写法是DStream –> DStream,我们也可以用transform 实现 DStream –>RDD–> DStream
package com.hainiu.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* StreamingContext
* DStream
*/
object TestStreamingWithForeachRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("streaming")
conf.setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
// val sc = ssc.sparkContext
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("nn1", 6666)
ds.transform((rdd,time)=>{
rdd.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
}).print()
ssc.start()
ssc.awaitTermination()
}
}
d 用 foreachRDD 改写
当然,我们也可以用 foreachRDD 实现 将流式计算 完全用批处理的方式写
package com.hainiu.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* StreamingContext
* DStream
*/
object TestStreamingWithForeachRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("streaming")
conf.setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
// val sc = ssc.sparkContext
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("nn1", 6666)
ds.foreachRDD((rdd,time)=>{
println("run time :"+time)
rdd.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}
e DStream 的 transform 和 foreachRDD 的异同点
相同点:
1)transform和foreachRDD都可以进行RDD转换,可让写spark-streaming程序像写spark-core一样。比如RDD转成DataSet或DatFrame进行spark-sql的操作,操作方便。
2)只有 transform和foreachRDD 算子里的函数执行分driver端运行和executor端运行;DStream 的其他算子的函数都是在executor端运行。
区别:
1)transform 是转换算子,foreachRDD是输出算子。
2)transform 可以将旧RDD转成新RDD,然后返回DStream,执行DStream的行动操作。
而 foreachRDD 本身是DStream的行动操作,它需要将所有的DStream 操作的代码转成RDD操作,直到最后。
DStream Driver端执行的代码与executor 端执行的代码示例