spark Streaming
1.DStream 无状态转换操作
OLDDStream -> NEWDStream(当前批次和历史数据无关)
转换 | 描述 |
---|---|
map(func) | 源 DStream的每个元素通过函数func返回一个新的DStream |
flatMap(func) | 类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素 |
filter(func) | 在源DStream上选择func函数返回仅为true的元素,最终返回一个新的DStream |
repartition(numPartitions) | 通过输入的参数numPartitions的值来改变DStream的分区大小 |
union(otherStream) | 返回一个包含源DStream与其他 DStream的元素合并后的新DStream |
count() | 对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam |
reduce(func) | 使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStream。 |
countByValue() | 计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。 |
reduceByKey(func, [numTasks]) | 当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。 |
join(otherStream, [numTasks]) | 当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新 DStream。 |
cogroup(otherStream, [numTasks]) | 当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。 |
transform(func) |
通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。 |
1.dstream 转换
val dstream2 = dstream1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
2.dstream ->rdd->dstream
// 通过 transform 将 dstream 转成rdd,然后 通过rdd 的 转换进行单词统计,然后把最终的rdd 在转换成 dstream
val dstream2 = dstream1.transform(rdd =>{
val name = "hehe" // driver端运行
println(s"name:${name}") // driver端运行
rddnew = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
rddnew
})
2.DStream 有状态转换操作
//滑动窗口转换操作 和 updateStateByKey 操作
//窗口计算
转换 | 描述 |
---|---|
window(windowLength, slideInterval) | 返回一个基于源DStream的窗口批次计算后得到新的DStream。 |
countByWindow(windowLength,slideInterval) | 返回基于滑动窗口的DStream中的元素的数量。 |
reduceByWindow(func, windowLength,slideInterval) | 基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) | 基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。可以进行repartition操作。 |
reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval, [numTasks]) | 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作(func),并对离开窗口的老数据进行“逆向reduce” 操作(invFunc)。但是,只能用于“可逆的reduce函数”必须启用“检查点”才能使用此操作 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。 |
updateStateByKey | 用于记录历史记录 |
DStream 输出操作
转换 | 描述 |
---|---|
print() | 在Driver中打印出DStream中数据的前10个元素。 |
saveAsTextFiles(prefix, [suffix]) | 将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) | 将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) | 将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) | 最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。 |
3.SparkStreaming程序
1). 读取socket 创建DStream
1.dstream直接转换
//webui查看 127.0.0.1:4040
//DStream –> DStream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* @Description(描述): DStream –> DStream
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间):
* @ * * * * * * * * * * * * * @
*/
object SparkStreamingSocket {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[*]")
//设置批处理时间
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 该计算方式的缓存默认级别:StorageLevel.MEMORY_AND_DISK_SER_2
// 从socket端接收一行数据,数据是按照空格分隔的
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
val reduceByKeyDS : DStream[(String, Int)]= inputDS.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
reduceByKeyDS.foreachRDD((r, t) => {
println(s"count time:${t}, ${r.collect().toList}")
})
ssc.start()
ssc.awaitTermination()
}
}
2.transform(DStream –>RDD–> newRDD–> new DStream)
// 下面的DStream 转换可以用RDD转换替代
val reduceByKeyDS: DStream[(String, Int)] = inputDS.transform(rdd => {
val reduceByKey: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
reduceByKey
})
reduceByKeyDS.foreachRDD((r,t) =>{
println(s"count time:${t}, ${r.collect().toList}")
})
3.foreachRDD
// val flatMapDS: DStream[String] = inputDS.flatMap(_.split(" "))
// val pairDS: DStream[(String, Int)] = flatMapDS.map((_,1))
// val reduceByKeyDS: DStream[(String, Int)] = pairDS.reduceByKey(_ + _)
// reduceByKeyDS.foreachRDD((r,t) =>{
// println(s"count time:${t}, ${r.collect().toList}")
// })
inputDS.foreachRDD((rdd,t) =>{
val s1 = "aa"
println(s1)
val reduceByKey: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
println(s"count time:${t}, ${reduceByKey.collect().toList}")
})
DStream 的 transform 和 foreachRDD 的异同点
1)transform 是转换算子,foreachRDD是输出算子。
2)transform 可以将旧RDD转成新RDD,然后返回DStream,执行DStream的行动操作。
而 foreachRDD 本身是DStream的行动操作,它需要将所有的DStream 操作的代码转成RDD操作,直到最后。
2 ).updateStateByKey
(从checkpoint恢复历史数据) 有状态转换操作
1.需要设置checkpoint
2.`返回结果本次汇总数据,也就是下次的V2数据`
3.程序重启以后,新修改端口不起作用,端口记录在了checkpoint里面
4.读取的端口流要放在括号里,两个流会冲突
1.updatestatebykey基本使用
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object SparkStreamingSocketUpdateState3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocketUpdateState3")
val checkPointDir: String = "/tmp/sparkstreaming/check_update3"
// () => StreamingContext
val createSteamingContext = () => {
// 创建Streamingcontext对象, 给它批次间隔时间
// 代码设置的是攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 设置checkpoint
ssc.checkpoint(checkPointDir)
// 读取socket流
// 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
val ds2: DStream[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1))
val resDS: DStream[(String, Int)] = ds2.updateStateByKey((seq: Seq[Int], lastOption: Option[Int]) => {
// 计算当前批次数据
val v1: Int = seq.sum
// 获取上一批次统计结果
val v2: Int = if (lastOption.isDefined) lastOption.get else 0
// 当前批次数据 + 上一批次统计结果
val now = v1 + v2
Some(now)
})
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc
}
//如果checkpoint没有数据,则将通过调用提供的“creatingFunc”来创建StreamingContext
//如果checkpoint有数据,通过checkpoint恢复即可,checkpoint里会记录流的信息
val ssc: StreamingContext = StreamingContext.getOrCreate(checkPointDir, createSteamingContext)
// 启动
ssc.start()
// 阻塞,死循环,除非抛异常或手动退出
ssc.awaitTermination()
}
}
2.updatestatebykey 有数据更新就写入数据库,无数据更新就不写入
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* @Description(描述): 返回结果本次汇总数据,也就是下次的V2数据 `
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021-09-01.
* @ * * * * * * * * * * * * * @
*/
object ReadSocketStreamingWithUpdate4 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ReadSocketStreamingWithUpdate4")
val checkpointPath: String = "/tmp/sparkstreaming/check_upate4"
// 把函数抽出来
val createStreamingContext: () => StreamingContext = () => {
// 创建StreamingContext对象,每5秒攒一批数据触发运算
val ssc: StreamingContext = new StreamingContext(conf, Durations.seconds(5))
// 设置 checkpoint
ssc.checkpoint(checkpointPath)
// 创建socket流
// receiver接收socket流的存储缓存机制: StorageLevel.MEMORY_AND_DISK_SER_2
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
val ds2: DStream[(String, ValueAndStateBean)] = inputDS.flatMap(_.split(" ")).map((_, ValueAndStateBean(1)))
// 利用updateStateByKey 实现有状态的转换
val ds3: DStream[(String, ValueAndStateBean)] = ds2.updateStateByKey((seq: Seq[ValueAndStateBean], lastOption: Option[ValueAndStateBean]) => {
var sum: Int = 0
// 统计本批次的结果
for (bean <- seq) {
sum += bean.num
}
// 获取上一批次的结果
val lastValue: Int = if (lastOption.isDefined) lastOption.get.num else 0
// 本批次结果+上一批次结果
val nowValue = sum + lastValue
// 本批次没有不更新(后面会筛选掉)
if (sum == 0) {
Some(ValueAndStateBean(nowValue, false))
} else {
// 本批次有更新(后面会保留)
Some(ValueAndStateBean(nowValue, true))
}
})
ds3.foreachRDD((rdd, time) => {
// 筛选ValueAndStateBean里面的isUpdate=true的留下
// 本批次有的写入,本批次没有的不写入
val filterRdd: RDD[(String, ValueAndStateBean)] = rdd.filter(_._2.isUpdate)
println(s"time:${time}, 写入MySQL的data:${filterRdd.collect().toBuffer}")
})
ssc
}
// 当checkpoint目录没有数据时,会执行函数创建StreamingContext对象
// 当checkpoint目录有数据时,会从checkpoint恢复StreamingContext对象, 同时StreamingContext对象记录着处理的socket流
val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointPath, createStreamingContext)
ssc.start()
ssc.awaitTermination()
}
}
//样例类解决 是否更新本批次数据
// num: 数值, isUpdate: 是否更新
case class ValueAndStateBean(val num: Int, val isUpdate: Boolean = false)
4.window 操作
4.1 window函数基本操作
//1.需要设置滑动窗口间隔,批次时间,和滑动间隔
//2.先countByValue 再window 能减少数据聚合
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object SparkStreamingSocketWindow2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocketWindow")
// 创建Streamingcontext对象, 给它批次间隔时间
// 代码设置的是攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 读取socket流
// 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
//先countByValue 再window 能减少数据聚合
val ds2: DStream[String] = inputDS.flatMap(_.split(" "))
val resDS: DStream[(String, Long)] = ds2.countByValue()
.window(Durations.seconds(20), Durations.seconds(10))
.reduceByKey(_ + _)
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, 窗口运算结果:${rdd.collect().toBuffer}")
})
// 启动
ssc.start()
// 阻塞,死循环,除非抛异常或手动退出
ssc.awaitTermination()
}
}
4.2countByValueAndWindow , reduceByKeyAndWindow
//必须有checkpoint
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object SparkStreamingSocketWindow3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocketWindow3")
val checkPointDir: String = "/tmp/sparkstreaming/check_window4"
//如果checkpoint没有数据,则将通过调用提供的“creatingFunc”来创建StreamingContext
//如果checkpoint有数据,通过checkpoint恢复即可,checkpoint里会记录流的信息
val ssc: StreamingContext = StreamingContext.getOrCreate(checkPointDir, () => {
// 创建Streamingcontext对象, 给它批次间隔时间
// 代码设置的是攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 设置checkpoint
ssc.checkpoint(checkPointDir)
// 读取socket流
// 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
val ds2: DStream[String] = inputDS.flatMap(_.split(" "))
// 底层调用了window函数, 缓存级别:StorageLevel.MEMORY_ONLY_SER
// countByValueAndWindow 底层调用reduceByKeyAndWindow,而且这个函数有逆向reduce函数,需要设置checkpoint来存储之前窗口的数据
val resDS: DStream[(String, Long)] = ds2.countByValueAndWindow(
Durations.seconds(20),
Durations.seconds(10))
//reducebykeyandwindow ==> 需要设置checkpoint
/* val resDS: DStream[(String, Int)] = ds2.map((_, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Durations.seconds(20), Durations.seconds(10))
.filter(_._2 != 0)*/
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc
})
// 启动
ssc.start()
// 阻塞,死循环,除非抛异常或手动退出
ssc.awaitTermination()
}
}
5.SparkStreamingFile
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* sparkstrewaming 读文件
*/
object SparkStreaming7File {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("SparkStreaming7File")
//设置能读取到修改时间为一个月范围内的文件
conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")
// 5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
//要读取的文件位置
val fileDir = "/tmp/sparkstreaming/input_file"
val hadoopConf = new Configuration
//newFilesOnly = true, 一个月范围以外的数据也可以读进来 (程序启动以后只读新的文件)
//(path: Path) => path.toString.endsWith(".txt"),筛选以txt结尾的文件 , 其余格式的过滤掉
val fileDS: InputDStream[(LongWritable, Text)] = ssc.fileStream[LongWritable, Text,
TextInputFormat](fileDir,
(path: Path) => path.toString.endsWith(".txt"),
true,
hadoopConf)
val ds2: DStream[String] = fileDS.map(_._2.toString)
// countByValue() 等效于 map((_,1)).reduceByKey(_ + _)
val ds3: DStream[(String, Long)] = ds2.flatMap(_.split(" ")).countByValue()
ds3.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc.start()
ssc.awaitTermination()
}
}
6.cogroup(文件流join socket流)
//数据必须在同一个批次内
package sparkstreaming
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* cogroup join
* 文件流 join socket流
*/
object SparkStreaming8Join {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming8Join")
//设置能读取到修改时间为一个月范围内的文件
conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")
// 攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
val fileDir = "/tmp/sparkstreaming/input_file2"
val hadoopConf = new Configuration
// 读文件流
val fileDS: InputDStream[(LongWritable, Text)] = ssc.fileStream[LongWritable, Text,
TextInputFormat](fileDir,
(path: Path) => true,
false,
hadoopConf)
val fileResDS: DStream[(String, String)] = fileDS.map(_._2.toString)
.map(f => {
val arr: Array[String] = f.split(" ")
val countryCode = arr(0)
val countryName = arr(1)
(countryCode, countryName)
})
// 读socket流
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
// CN CN CN --> CN,3
val socketResDS: DStream[(String, Long)] = socketDS.flatMap(_.split(" ")).countByValue()
//两个流cogroup
/* val cogroupDS: DStream[(String, (Iterable[Long], Iterable[String]))] = socketResDS.cogroup(fileResDS)
cogroupDS.foreachRDD((rdd, time) =>{
val arr: Array[(String, (Iterable[Long], Iterable[String]))] = rdd.collect()
arr.foreach(f => println(f))
})*/
//两个流join
val joinDS: DStream[(String, (Long, String))] = socketResDS.join(fileResDS)
joinDS.foreachRDD((rdd, time) => {
val arr: Array[(String, (Long, String))] = rdd.collect()
arr.foreach(f => println(f))
})
ssc.start()
ssc.awaitTermination()
}
}
7.多receiver源union的方式
7.1两个格式一样的流union
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable.ArrayBuffer
/**
* sparkstreaming两个流union (格式一样的流union)
*/
object SparkStreaming9Union {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("SparkStreaming9Union")
// 创建Streamingcontext对象, 给它批次间隔时间
// 代码设置的是攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 读取socket流
// 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
// CN CN CN
val s1DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
// CN CN CN
val s2DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 7777)
// val unionDS: DStream[String] = s1DS.union(s2DS)
val dStreams = new ArrayBuffer[DStream[String]]
dStreams += s1DS
dStreams += s2DS
val unionDS: DStream[String] = ssc.union(dStreams)
val resDS: DStream[(String, Long)] = unionDS.flatMap(_.split(" ")).countByValue()
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc.start()
ssc.awaitTermination()
}
}
7.2 格式不一样的两个流 需要加工一下 再union
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable.ArrayBuffer
object SparkStreaming92Union {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("SparkStreaming92Union")
// 创建Streamingcontext对象, 给它批次间隔时间
// 代码设置的是攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 读取socket流
// 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
// CN CN CN
val s1DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
// CN#CN#CN
val s2DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 7777)
// 加工一下
val s1DS2: DStream[String] = s1DS.flatMap(_.split(" "))
val s2DS2: DStream[String] = s2DS.flatMap(_.split("#"))
// val unionDS: DStream[String] = s1DS.union(s2DS)
val dStreams = new ArrayBuffer[DStream[String]]
dStreams += s1DS2
dStreams += s2DS2
val unionDS: DStream[String] = ssc.union(dStreams)
val resDS: DStream[(String, Long)] = unionDS.countByValue()
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc.start()
ssc.awaitTermination()
}
}
8.SparkStreaming输出到HDFS
//小文件问题?
1. 使用coalesce 减少分区数,进而减少输出小文件的个数。
2. 使用HDFS的append方式,追加写入文件中。
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object SparkStreaming10SaveHDFS {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming10SaveHDFS")
// 创建Streamingcontext对象, 给它批次间隔时间
// 代码设置的是攒5s的数据,触发批次运算
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 读取socket流
// 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
val resDS: DStream[(String, Long)] = inputDS.flatMap(_.split(" ")).countByValue()
// 写入hdfs
resDS.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
// 多分区变少分区
val rdd2: RDD[(String, Long)] = rdd.coalesce(2)
val rdd3: RDD[Int] = rdd2.mapPartitionsWithIndex((index, it) => {
// ---一个分区创建hdfs文件流---
val sdf = new SimpleDateFormat("yyyyMMddHH")
val format: String = sdf.format(new Date())
// 要写入文件的hdfs
val hdfsDir: String = s"/user/panniu/spark/output_hdfs/${format}/data_${index}"
val fs: FileSystem = FileSystem.get(new Configuration())
val hdfsPath: Path = new Path(hdfsDir)
var os: FSDataOutputStream = null
try {
// 如果文件存在就追加,如果文件不存在就创建
os = if (fs.exists(hdfsPath)) fs.append(hdfsPath)
else fs.create(hdfsPath)
it.foreach(f => {
// ---一条一条写入---
os.write(s"${f._1}\t${f._2}\n".getBytes("utf-8"))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
// ---关闭hdfs文件流---
os.close()
}
// 目的是需要返回一个迭代器,数据并不重要
List[Int]().iterator
})
// 触发运算
rdd3.foreach(f => println(s"time:${time}"))
}
})
ssc.start()
ssc.awaitTermination()
}
}
9.SparkStreaming-kafka
kafkaStream 只有刚开始得到的直连流,才保存kafka元数据信息,一旦流转换成其他流,那kafka元数据信息就丢失了
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
<scope>compile</scope>
</dependency>
1.sparkstreaming-kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable
/**
*kafka直连流 sparkstreaming-kafka
*/
object SparkStreamingKafka3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafka")
val ssc = new StreamingContext(conf, Durations.seconds(5))
val topics: String = "hainiu_sk"
//读取kafka的配置
val kafkaParams = new mutable.HashMap[String, Object]()
kafkaParams += "bootstrap.servers" -> "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"
kafkaParams += "group.id" -> "group25"
kafkaParams += "key.deserializer" -> classOf[StringDeserializer].getName
kafkaParams += "value.deserializer" -> classOf[StringDeserializer].getName
kafkaParams += "auto.offset.reset" -> "earliest"
kafkaParams += "enable.auto.commit" -> "true"
// 设置位置
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// 设置消费策略
// 消费: 订阅和分配两种策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(Set(topics), kafkaParams)
//ConsumerStrategies.Assign()
// 创建kafka直连流(没有receiver)
// 直连流特点: kafka有多少个分区,SparkStreaming流就有多少个分区
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
locationStrategy,
consumerStrategy
)
var ranges: HasOffsetRanges = null
// kafkaStream 只有刚开始得到的直连流,才保存kafka元数据信息,一旦流转换成其他流,那kafka元数据信息就丢失了
val kafkaStream2: DStream[ConsumerRecord[String, String]] = kafkaStream.transform(kafkaRdd => {
println(kafkaRdd.getNumPartitions)
// driver 端
ranges = kafkaRdd.asInstanceOf[HasOffsetRanges]
val arr: Array[OffsetRange] = ranges.offsetRanges
for (offsetRange <- arr) {
val topic: String = offsetRange.topic
val partition: Int = offsetRange.partition
val offset: Long = offsetRange.fromOffset
println(s"当前批次刚开始消费的offset: ${topic}\t${partition}\t${offset}")
}
kafkaRdd
})
// "aa aa aa"
val ds2: DStream[String] = kafkaStream2.map(_.value())
val resDS: DStream[(String, Long)] = ds2.flatMap(_.split(" ")).countByValue()
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc.start()
ssc.awaitTermination()
}
}
- 一个SparkStreaming对接多个kafkatopic,每个topic数据程序处理的逻辑不一样
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable
/**
* 一个SparkStreaming对接多个kafkatopic,每个topic数据程序处理的逻辑不一样
*/
object SparkStreamingKafka {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafka")
val ssc = new StreamingContext(conf, Durations.seconds(5))
val topics: String = "hainiu_sk,hainiu_sk2"
// 读取kafka的配置
val kafkaParams = new mutable.HashMap[String, Object]()
kafkaParams += "bootstrap.servers" -> "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"
kafkaParams += "group.id" -> "group25"
kafkaParams += "key.deserializer" -> classOf[StringDeserializer].getName
kafkaParams += "value.deserializer" -> classOf[StringDeserializer].getName
kafkaParams += "auto.offset.reset" -> "earliest"
kafkaParams += "enable.auto.commit" -> "true"
// 设置位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// 设置消费策略
// 消费: 订阅和分配两种策略
//val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(topics.split(",").toSet, kafkaParams)
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(Set(topics), kafkaParams)
//ConsumerStrategies.Assign()
// 创建kafka直连流(没有receiver)
// 直连流特点: kafka有多少个分区,SparkStreaming流就有多少个分区
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
locationStrategy,
consumerStrategy
)
var ranges: HasOffsetRanges = null
// kafkaStream 只有刚开始得到的直连流,才保存kafka元数据信息,一旦流转换成其他流,那kafka元数据信息就丢失了
val kafkaStream2: DStream[ConsumerRecord[String, String]] = kafkaStream.transform(kafkaRdd => {
println(kafkaRdd.getNumPartitions)
// driver 端
ranges = kafkaRdd.asInstanceOf[HasOffsetRanges]
val arr: Array[OffsetRange] = ranges.offsetRanges
for (offsetRange <- arr) {
val topic: String = offsetRange.topic
val partition: Int = offsetRange.partition
val offset: Long = offsetRange.fromOffset
println(s"当前批次刚开始消费的offset: ${topic}\t${partition}\t${offset}")
}
kafkaRdd
})
val ds222: DStream[(String, String)] = kafkaStream2.map(f => (s"${f.topic()}-${f.partition()}", f.value()))
val ds223: DStream[String] = ds222.flatMap(f => {
val topicAndPartitioin: String = f._1
val topic: String = topicAndPartitioin.split("-")(0)
val value: String = f._2
// 通过不同的topic,处理不同的数据
if (topic.equals("hainiu_sk")) {
value.split(" ").toIterator
} else {
value.split("#").toIterator
}
})
val resDS: DStream[(String, Long)] = ds223.countByValue()
resDS.foreachRDD((rdd, time) => {
println(s"time:${time}, data:${rdd.collect().toBuffer}")
})
ssc.start()
ssc.awaitTermination()
}
}
10.sparkstreaming-kafka高级开发
1.广播变量
//1.使用文件流join kafka流
//2.累加器ssc.sparkContext.longAccumulator
//3.广播变量
import java.io.{BufferedReader, InputStreamReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.util.LongAccumulator
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.Breaks
/**
* 广播变量
* 使用文件流join kafka流
*/
object KafkaStreamJoinFileByBroadUpdateConfig {
def main(args: Array[String]): Unit = {
// cpu核数 = kafka数据对应的分区数 + driver用的1个
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaStreamJoinFileByBroadUpdateConfig")
val ssc = new StreamingContext(conf, Durations.seconds(5))
val topic: String = "hainiu_sk3"
// 读取kafka的配置
val kafkaParams = new mutable.HashMap[String, Object]()
kafkaParams += "bootstrap.servers" -> "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"
kafkaParams += "group.id" -> "group25"
kafkaParams += "key.deserializer" -> classOf[StringDeserializer].getName
kafkaParams += "value.deserializer" -> classOf[StringDeserializer].getName
kafkaParams += "auto.offset.reset" -> "earliest"
kafkaParams += "enable.auto.commit" -> "true"
//位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// 分配策略:读分区0和1
val topicPartitions = new ListBuffer[TopicPartition]
topicPartitions += new TopicPartition(topic, 0)
topicPartitions += new TopicPartition(topic, 1)
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Assign(topicPartitions, kafkaParams)
// 分区0和分区1构建出来的流
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
locationStrategy,
consumerStrategy
)
// 分配策略:读分区2和3
val topicPartitions2 = new ListBuffer[TopicPartition]
topicPartitions2 += new TopicPartition(topic, 2)
topicPartitions2 += new TopicPartition(topic, 3)
val consumerStrategy2: ConsumerStrategy[String, String] = ConsumerStrategies.Assign(topicPartitions2, kafkaParams)
// 分区2和分区3构建出来的流
val kafkaStream2: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
locationStrategy,
consumerStrategy2
)
// 将两个流union在一起
val kafkaStreams = new ArrayBuffer[InputDStream[ConsumerRecord[String, String]]]
kafkaStreams += kafkaStream
kafkaStreams += kafkaStream2
val unionDS: DStream[ConsumerRecord[String, String]] = ssc.union(kafkaStreams)
// "aa aa aa"
val ds2: DStream[String] = unionDS.map(_.value())
val resDS: DStream[(String, Long)] = ds2.flatMap(_.split(" ")).countByValue()
// 定义广播变量
var broad: Broadcast[mutable.HashMap[String, String]] = ssc.sparkContext.broadcast(new mutable.HashMap[String, String])
// 定义匹配到的累加器
val matchAcc: LongAccumulator = ssc.sparkContext.longAccumulator
// 定义没匹配到的累加器
val notMatchAcc: LongAccumulator = ssc.sparkContext.longAccumulator
// 定义更新间隔时间
val configUpdateIntervalTime: Long = 10000L
// 定义广播变量最后更新时间
var configUpdateOverTime: Long = 0L
resDS.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
// 如果广播变量是空的,首次加载文件内容到广播变量里
// 如果当前时间-最后更新时间 >= 更新间隔时间, 加载文件内容到广播变量里(达到定期更新配置)
if (broad.value.size == 0 || System.currentTimeMillis() - configUpdateOverTime >= configUpdateIntervalTime) {
// 定义一个configMap用于装读取的文件内容
val configMap = new mutable.HashMap[String, String]
// 遍历hdfs目录下的所有文件加载到广播变量里
val configBaseDir: String = "/tmp/sparkstreaming/input_updateSparkBroadCast"
val fs: FileSystem = FileSystem.get(new Configuration())
val configBasePath = new Path(configBaseDir)
// 获取hdfs目录下的所有文件
val arr: Array[FileStatus] = fs.listStatus(configBasePath)
// 读取每个文件的内容加载到map
for (file <- arr) {
val path: Path = file.getPath
var reader: BufferedReader = null
try {
// 基于读取hdfs文件产生的流创建BufferedReader对象
reader = new BufferedReader(new InputStreamReader(fs.open(path)))
var line: String = null
val breaks = new Breaks
breaks.breakable(
while (true) {
line = reader.readLine()
if (line == null) {
breaks.break()
}
val arr2: Array[String] = line.split("\t")
val countryCode: String = arr2(0)
val countryName: String = arr2(1)
configMap += (countryCode -> countryName)
}
)
} catch {
case e: Exception => e.printStackTrace()
} finally {
reader.close()
}
}
// 清空广播变量
broad.unpersist()
// 再加载广播变量
broad = ssc.sparkContext.broadcast(configMap)
println(s"time:${System.currentTimeMillis()},configMap:${configMap}")
// 更新完广播变量之后,需要修改最后更新时间
configUpdateOverTime = System.currentTimeMillis()
}
// 广播变量和kafka流join
rdd.foreachPartition((it) => {
// 提取广播变量里的Map
val configMap: mutable.HashMap[String, String] = broad.value
it.foreach(f => {
// (CN, 2)
val countryCode: String = f._1
val option: Option[String] = configMap.get(countryCode)
if (option == None) {
// 没匹配到
println(s"not match data:${f}")
notMatchAcc.add(1)
} else {
// 匹配到
val countryName: String = option.get
println(s"match data:${f}\t${countryName}")
matchAcc.add(1)
}
})
}
)
// 如果要将join的结果写入MySQL
// 1) rdd2 = rdd.mapPartitions((it) => it)
// 2) arr = rdd2.collect // 拉取到driver端
// 3) 把arr的数据写入MySQL(在driver端写入MySQL)
// action之后,输出累加器
println(s"time:${time}, matchAcc:${matchAcc.value}")
println(s"time:${time}, notMatchAcc:${notMatchAcc.value}")
// 清空累加器
matchAcc.reset()
notMatchAcc.reset()
}
})
ssc.start()
ssc.awaitTermination()
}
}
11.sparkStreaming-kafka的offset管理
//Exactly once(只处理一次)==> 手动把offset维护到第三方存储,比如zookeeper
zookeeper管理