sparkstreaming 学习笔记

分享 123456789987654321 ⋅ 于 2021-10-06 14:13:56 ⋅ 1430 阅读

spark Streaming

1.DStream 无状态转换操作

OLDDStream -> NEWDStream(当前批次和历史数据无关)

转换 描述
map(func) 源 DStream的每个元素通过函数func返回一个新的DStream
flatMap(func) 类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素
filter(func) 在源DStream上选择func函数返回仅为true的元素,最终返回一个新的DStream
repartition(numPartitions) 通过输入的参数numPartitions的值来改变DStream的分区大小
union(otherStream) 返回一个包含源DStream与其他 DStream的元素合并后的新DStream
count() 对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam
reduce(func) 使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue() 计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks]) 当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks]) 当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新 DStream。
cogroup(otherStream, [numTasks]) 当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
transform(func) 通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。

1.dstream 转换

val dstream2 = dstream1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)

2.dstream ->rdd->dstream

// 通过 transform 将 dstream 转成rdd,然后 通过rdd 的 转换进行单词统计,然后把最终的rdd 在转换成 dstream

val dstream2 = dstream1.transform(rdd =>{
    val name = "hehe"               // driver端运行
    println(s"name:${name}")    // driver端运行
    rddnew = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
    rddnew
})

2.DStream 有状态转换操作

//滑动窗口转换操作 和 updateStateByKey 操作
//窗口计算
转换 描述
window(windowLength, slideInterval) 返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval) 返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval) 基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) 基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。可以进行repartition操作。
reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作(func),并对离开窗口的老数据进行“逆向reduce” 操作(invFunc)。但是,只能用于“可逆的reduce函数”必须启用“检查点”才能使用此操作
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。
updateStateByKey 用于记录历史记录

DStream 输出操作

转换 描述
print() 在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix]) 将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix]) 将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix]) 将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func) 最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

3.SparkStreaming程序

1). 读取socket 创建DStream

1.dstream直接转换
//webui查看  127.0.0.1:4040
//DStream –> DStream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
  *  @Description(描述):  DStream –> DStream
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间):
  *  @ * * * * * * * * * * * * * @
  */
object SparkStreamingSocket {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[*]")
        //设置批处理时间
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        // 该计算方式的缓存默认级别:StorageLevel.MEMORY_AND_DISK_SER_2
        // 从socket端接收一行数据,数据是按照空格分隔的
        val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
        val reduceByKeyDS : DStream[(String, Int)]= inputDS.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

        reduceByKeyDS.foreachRDD((r, t) => {
            println(s"count time:${t}, ${r.collect().toList}")
        })
        ssc.start()
        ssc.awaitTermination()
    }
}
2.transform(DStream –>RDD–> newRDD–> new DStream)
// 下面的DStream 转换可以用RDD转换替代
val reduceByKeyDS: DStream[(String, Int)] = inputDS.transform(rdd => {
val reduceByKey: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
      reduceByKey
    })

    reduceByKeyDS.foreachRDD((r,t) =>{
        println(s"count time:${t}, ${r.collect().toList}")
    })
3.foreachRDD
//    val flatMapDS: DStream[String] = inputDS.flatMap(_.split(" "))
//    val pairDS: DStream[(String, Int)] = flatMapDS.map((_,1))
//    val reduceByKeyDS: DStream[(String, Int)] = pairDS.reduceByKey(_ + _)
//    reduceByKeyDS.foreachRDD((r,t) =>{
//      println(s"count time:${t}, ${r.collect().toList}")
//    })
 inputDS.foreachRDD((rdd,t) =>{
 val s1 = "aa"
 println(s1)
 val reduceByKey: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
      println(s"count time:${t}, ${reduceByKey.collect().toList}")
 })

DStream 的 transform 和 foreachRDD 的异同点

1)transform 是转换算子,foreachRDD是输出算子。 
2)transform 可以将旧RDD转成新RDD,然后返回DStream,执行DStream的行动操作。
  而 foreachRDD 本身是DStream的行动操作,它需要将所有的DStream 操作的代码转成RDD操作,直到最后。

2 ).updateStateByKey

(从checkpoint恢复历史数据)  有状态转换操作
1.需要设置checkpoint  
2.`返回结果本次汇总数据,也就是下次的V2数据`
3.程序重启以后,新修改端口不起作用,端口记录在了checkpoint里面
4.读取的端口流要放在括号里,两个流会冲突

1.updatestatebykey基本使用

package sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object SparkStreamingSocketUpdateState3 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocketUpdateState3")

        val checkPointDir: String = "/tmp/sparkstreaming/check_update3"

        // () => StreamingContext
        val createSteamingContext = () => {
            // 创建Streamingcontext对象, 给它批次间隔时间
            // 代码设置的是攒5s的数据,触发批次运算
            val ssc = new StreamingContext(conf, Durations.seconds(5))
            // 设置checkpoint
            ssc.checkpoint(checkPointDir)
            // 读取socket流
            // 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
            val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
            val ds2: DStream[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1))

            val resDS: DStream[(String, Int)] = ds2.updateStateByKey((seq: Seq[Int], lastOption: Option[Int]) => {
                // 计算当前批次数据
                val v1: Int = seq.sum

                // 获取上一批次统计结果
                val v2: Int = if (lastOption.isDefined) lastOption.get else 0

                // 当前批次数据 + 上一批次统计结果
                val now = v1 + v2

                Some(now)
            })

            resDS.foreachRDD((rdd, time) => {
                println(s"time:${time}, data:${rdd.collect().toBuffer}")
            })
            ssc
        }

        //如果checkpoint没有数据,则将通过调用提供的“creatingFunc”来创建StreamingContext
        //如果checkpoint有数据,通过checkpoint恢复即可,checkpoint里会记录流的信息
        val ssc: StreamingContext = StreamingContext.getOrCreate(checkPointDir, createSteamingContext)

        // 启动
        ssc.start()
        // 阻塞,死循环,除非抛异常或手动退出
        ssc.awaitTermination()

    }

}

2.updatestatebykey 有数据更新就写入数据库,无数据更新就不写入

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
  *  @Description(描述): 返回结果本次汇总数据,也就是下次的V2数据 `
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间): Created in 2021-09-01.
  *  @ * * * * * * * * * * * * * @
  */

object ReadSocketStreamingWithUpdate4 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ReadSocketStreamingWithUpdate4")
        val checkpointPath: String = "/tmp/sparkstreaming/check_upate4"
        // 把函数抽出来
        val createStreamingContext: () => StreamingContext = () => {
            // 创建StreamingContext对象,每5秒攒一批数据触发运算
            val ssc: StreamingContext = new StreamingContext(conf, Durations.seconds(5))
            // 设置 checkpoint
            ssc.checkpoint(checkpointPath)
            // 创建socket流
            // receiver接收socket流的存储缓存机制: StorageLevel.MEMORY_AND_DISK_SER_2
            val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
            val ds2: DStream[(String, ValueAndStateBean)] = inputDS.flatMap(_.split(" ")).map((_, ValueAndStateBean(1)))
            // 利用updateStateByKey 实现有状态的转换
            val ds3: DStream[(String, ValueAndStateBean)] = ds2.updateStateByKey((seq: Seq[ValueAndStateBean], lastOption: Option[ValueAndStateBean]) => {
                var sum: Int = 0
                // 统计本批次的结果
                for (bean <- seq) {
                    sum += bean.num
                }
                // 获取上一批次的结果
                val lastValue: Int = if (lastOption.isDefined) lastOption.get.num else 0
                // 本批次结果+上一批次结果
                val nowValue = sum + lastValue
                // 本批次没有不更新(后面会筛选掉)
                if (sum == 0) {
                    Some(ValueAndStateBean(nowValue, false))
                } else {
                    // 本批次有更新(后面会保留)
                    Some(ValueAndStateBean(nowValue, true))
                }
            })
            ds3.foreachRDD((rdd, time) => {
                // 筛选ValueAndStateBean里面的isUpdate=true的留下
                // 本批次有的写入,本批次没有的不写入
                val filterRdd: RDD[(String, ValueAndStateBean)] = rdd.filter(_._2.isUpdate)
                println(s"time:${time}, 写入MySQL的data:${filterRdd.collect().toBuffer}")
            })
            ssc
        }
        // 当checkpoint目录没有数据时,会执行函数创建StreamingContext对象
        // 当checkpoint目录有数据时,会从checkpoint恢复StreamingContext对象, 同时StreamingContext对象记录着处理的socket流
        val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointPath, createStreamingContext)
        ssc.start()
        ssc.awaitTermination()
    }
}

//样例类解决 是否更新本批次数据
// num: 数值, isUpdate: 是否更新
case class ValueAndStateBean(val num: Int, val isUpdate: Boolean = false)

4.window 操作

4.1 window函数基本操作
//1.需要设置滑动窗口间隔,批次时间,和滑动间隔
//2.先countByValue 再window 能减少数据聚合
package sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object SparkStreamingSocketWindow2 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocketWindow")
        // 创建Streamingcontext对象, 给它批次间隔时间
        // 代码设置的是攒5s的数据,触发批次运算
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        // 读取socket流
        // 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
        val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)

        //先countByValue 再window 能减少数据聚合
        val ds2: DStream[String] = inputDS.flatMap(_.split(" "))
        val resDS: DStream[(String, Long)] = ds2.countByValue()
                .window(Durations.seconds(20), Durations.seconds(10))
                .reduceByKey(_ + _)

        resDS.foreachRDD((rdd, time) => {
            println(s"time:${time}, 窗口运算结果:${rdd.collect().toBuffer}")
        })

        // 启动
        ssc.start()
        // 阻塞,死循环,除非抛异常或手动退出
        ssc.awaitTermination()

    }

}
4.2countByValueAndWindow , reduceByKeyAndWindow
//必须有checkpoint

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object SparkStreamingSocketWindow3 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocketWindow3")

        val checkPointDir: String = "/tmp/sparkstreaming/check_window4"

        //如果checkpoint没有数据,则将通过调用提供的“creatingFunc”来创建StreamingContext
        //如果checkpoint有数据,通过checkpoint恢复即可,checkpoint里会记录流的信息
        val ssc: StreamingContext = StreamingContext.getOrCreate(checkPointDir, () => {
            // 创建Streamingcontext对象, 给它批次间隔时间
            // 代码设置的是攒5s的数据,触发批次运算
            val ssc = new StreamingContext(conf, Durations.seconds(5))
            // 设置checkpoint
            ssc.checkpoint(checkPointDir)
            // 读取socket流
            // 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
            val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
            val ds2: DStream[String] = inputDS.flatMap(_.split(" "))
            // 底层调用了window函数, 缓存级别:StorageLevel.MEMORY_ONLY_SER
            // countByValueAndWindow 底层调用reduceByKeyAndWindow,而且这个函数有逆向reduce函数,需要设置checkpoint来存储之前窗口的数据
                  val resDS: DStream[(String, Long)] = ds2.countByValueAndWindow(
                    Durations.seconds(20),
                    Durations.seconds(10))

         //reducebykeyandwindow ==> 需要设置checkpoint

        /*  val resDS: DStream[(String, Int)] = ds2.map((_, 1))
                    .reduceByKeyAndWindow(_ + _, _ - _, Durations.seconds(20), Durations.seconds(10))
                    .filter(_._2 != 0)*/

            resDS.foreachRDD((rdd, time) => {
                println(s"time:${time}, data:${rdd.collect().toBuffer}")
            })
            ssc
        })
        // 启动
        ssc.start()
        // 阻塞,死循环,除非抛异常或手动退出
        ssc.awaitTermination()
    }
}

5.SparkStreamingFile

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
  * sparkstrewaming 读文件
  */
object SparkStreaming7File {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("SparkStreaming7File")

        //设置能读取到修改时间为一个月范围内的文件
        conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")
        // 5s的数据,触发批次运算
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        //要读取的文件位置
        val fileDir = "/tmp/sparkstreaming/input_file"
        val hadoopConf = new Configuration
        //newFilesOnly = true, 一个月范围以外的数据也可以读进来 (程序启动以后只读新的文件)
        //(path: Path) => path.toString.endsWith(".txt"),筛选以txt结尾的文件 , 其余格式的过滤掉
        val fileDS: InputDStream[(LongWritable, Text)] = ssc.fileStream[LongWritable, Text,
                TextInputFormat](fileDir,
            (path: Path) => path.toString.endsWith(".txt"),
            true,
            hadoopConf)
        val ds2: DStream[String] = fileDS.map(_._2.toString)
        // countByValue() 等效于 map((_,1)).reduceByKey(_ + _)
        val ds3: DStream[(String, Long)] = ds2.flatMap(_.split(" ")).countByValue()
        ds3.foreachRDD((rdd, time) => {
            println(s"time:${time}, data:${rdd.collect().toBuffer}")
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

6.cogroup(文件流join socket流)

//数据必须在同一个批次内
package sparkstreaming

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
  * cogroup join
  * 文件流 join socket流
  */
object SparkStreaming8Join {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming8Join")

        //设置能读取到修改时间为一个月范围内的文件
        conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")

        // 攒5s的数据,触发批次运算
        val ssc = new StreamingContext(conf, Durations.seconds(5))

        val fileDir = "/tmp/sparkstreaming/input_file2"
        val hadoopConf = new Configuration
        // 读文件流
        val fileDS: InputDStream[(LongWritable, Text)] = ssc.fileStream[LongWritable, Text,
                TextInputFormat](fileDir,
            (path: Path) => true,
            false,
            hadoopConf)
        val fileResDS: DStream[(String, String)] = fileDS.map(_._2.toString)
                .map(f => {
                    val arr: Array[String] = f.split(" ")
                    val countryCode = arr(0)
                    val countryName = arr(1)
                    (countryCode, countryName)
                })

        // 读socket流
        val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
        // CN CN CN --> CN,3
        val socketResDS: DStream[(String, Long)] = socketDS.flatMap(_.split(" ")).countByValue()
         //两个流cogroup

        /* val cogroupDS: DStream[(String, (Iterable[Long], Iterable[String]))] = socketResDS.cogroup(fileResDS)
          cogroupDS.foreachRDD((rdd, time) =>{
            val arr: Array[(String, (Iterable[Long], Iterable[String]))] = rdd.collect()
            arr.foreach(f => println(f))
          })*/
        //两个流join

        val joinDS: DStream[(String, (Long, String))] = socketResDS.join(fileResDS)
        joinDS.foreachRDD((rdd, time) => {
            val arr: Array[(String, (Long, String))] = rdd.collect()
            arr.foreach(f => println(f))
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

7.多receiver源union的方式

7.1两个格式一样的流union

package sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.collection.mutable.ArrayBuffer

/**
  * sparkstreaming两个流union (格式一样的流union)
  */
object SparkStreaming9Union {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("SparkStreaming9Union")
        // 创建Streamingcontext对象, 给它批次间隔时间
        // 代码设置的是攒5s的数据,触发批次运算
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        // 读取socket流
        // 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
        // CN CN CN
        val s1DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
        // CN CN CN
        val s2DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 7777)

        //    val unionDS: DStream[String] = s1DS.union(s2DS)
        val dStreams = new ArrayBuffer[DStream[String]]
        dStreams += s1DS
        dStreams += s2DS
        val unionDS: DStream[String] = ssc.union(dStreams)
        val resDS: DStream[(String, Long)] = unionDS.flatMap(_.split(" ")).countByValue()

        resDS.foreachRDD((rdd, time) => {
            println(s"time:${time}, data:${rdd.collect().toBuffer}")
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

7.2 格式不一样的两个流 需要加工一下 再union

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable.ArrayBuffer

object SparkStreaming92Union {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("SparkStreaming92Union")
        // 创建Streamingcontext对象, 给它批次间隔时间
        // 代码设置的是攒5s的数据,触发批次运算
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        // 读取socket流
        // 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
        // CN CN CN
        val s1DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
        // CN#CN#CN
        val s2DS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 7777)

        // 加工一下
        val s1DS2: DStream[String] = s1DS.flatMap(_.split(" "))
        val s2DS2: DStream[String] = s2DS.flatMap(_.split("#"))

        //    val unionDS: DStream[String] = s1DS.union(s2DS)
        val dStreams = new ArrayBuffer[DStream[String]]
        dStreams += s1DS2
        dStreams += s2DS2
        val unionDS: DStream[String] = ssc.union(dStreams)
        val resDS: DStream[(String, Long)] = unionDS.countByValue()

        resDS.foreachRDD((rdd, time) => {
            println(s"time:${time}, data:${rdd.collect().toBuffer}")
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

8.SparkStreaming输出到HDFS

//小文件问题?
1. 使用coalesce 减少分区数,进而减少输出小文件的个数。
2. 使用HDFS的append方式,追加写入文件中。
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object SparkStreaming10SaveHDFS {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming10SaveHDFS")
        // 创建Streamingcontext对象, 给它批次间隔时间
        // 代码设置的是攒5s的数据,触发批次运算
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        // 读取socket流
        // 默认的缓存级别:StorageLevel.MEMORY_AND_DISK_SER_2
        val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)

        val resDS: DStream[(String, Long)] = inputDS.flatMap(_.split(" ")).countByValue()
        // 写入hdfs
        resDS.foreachRDD((rdd, time) => {
            if (!rdd.isEmpty()) {
                // 多分区变少分区
                val rdd2: RDD[(String, Long)] = rdd.coalesce(2)

                val rdd3: RDD[Int] = rdd2.mapPartitionsWithIndex((index, it) => {
                    // ---一个分区创建hdfs文件流---
                    val sdf = new SimpleDateFormat("yyyyMMddHH")
                    val format: String = sdf.format(new Date())

                    // 要写入文件的hdfs
                    val hdfsDir: String = s"/user/panniu/spark/output_hdfs/${format}/data_${index}"
                    val fs: FileSystem = FileSystem.get(new Configuration())
                    val hdfsPath: Path = new Path(hdfsDir)
                    var os: FSDataOutputStream = null
                    try {
                        // 如果文件存在就追加,如果文件不存在就创建
                        os = if (fs.exists(hdfsPath)) fs.append(hdfsPath)
                        else fs.create(hdfsPath)
                        it.foreach(f => {
                            // ---一条一条写入---
                            os.write(s"${f._1}\t${f._2}\n".getBytes("utf-8"))
                        })

                    } catch {
                        case e: Exception => e.printStackTrace()
                    } finally {
                        // ---关闭hdfs文件流---
                        os.close()
                    }
                    // 目的是需要返回一个迭代器,数据并不重要
                    List[Int]().iterator
                })
                // 触发运算
                rdd3.foreach(f => println(s"time:${time}"))
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

9.SparkStreaming-kafka

kafkaStream 只有刚开始得到的直连流,才保存kafka元数据信息,一旦流转换成其他流,那kafka元数据信息就丢失了

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.1</version>
    <scope>compile</scope>
</dependency>

1.sparkstreaming-kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.collection.mutable

/**
  *kafka直连流 sparkstreaming-kafka 
  */
object SparkStreamingKafka3 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafka")
        val ssc = new StreamingContext(conf, Durations.seconds(5))

        val topics: String = "hainiu_sk"
        //读取kafka的配置
        val kafkaParams = new mutable.HashMap[String, Object]()
        kafkaParams += "bootstrap.servers" -> "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"
        kafkaParams += "group.id" -> "group25"
        kafkaParams += "key.deserializer" -> classOf[StringDeserializer].getName
        kafkaParams += "value.deserializer" -> classOf[StringDeserializer].getName
        kafkaParams += "auto.offset.reset" -> "earliest"
        kafkaParams += "enable.auto.commit" -> "true"

        // 设置位置
        val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent

        // 设置消费策略
        // 消费: 订阅和分配两种策略
        val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(Set(topics), kafkaParams)
        //ConsumerStrategies.Assign()

        // 创建kafka直连流(没有receiver)
        // 直连流特点: kafka有多少个分区,SparkStreaming流就有多少个分区
        val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
            locationStrategy,
            consumerStrategy
        )

        var ranges: HasOffsetRanges = null

        // kafkaStream 只有刚开始得到的直连流,才保存kafka元数据信息,一旦流转换成其他流,那kafka元数据信息就丢失了
        val kafkaStream2: DStream[ConsumerRecord[String, String]] = kafkaStream.transform(kafkaRdd => {
            println(kafkaRdd.getNumPartitions)
            // driver 端
            ranges = kafkaRdd.asInstanceOf[HasOffsetRanges]
            val arr: Array[OffsetRange] = ranges.offsetRanges
            for (offsetRange <- arr) {
                val topic: String = offsetRange.topic
                val partition: Int = offsetRange.partition
                val offset: Long = offsetRange.fromOffset
                println(s"当前批次刚开始消费的offset: ${topic}\t${partition}\t${offset}")
            }
            kafkaRdd
        })

        // "aa aa aa"
        val ds2: DStream[String] = kafkaStream2.map(_.value())
        val resDS: DStream[(String, Long)] = ds2.flatMap(_.split(" ")).countByValue()

        resDS.foreachRDD((rdd, time) => {
            println(s"time:${time}, data:${rdd.collect().toBuffer}")
        })

        ssc.start()
        ssc.awaitTermination()
    }
}
  1. 一个SparkStreaming对接多个kafkatopic,每个topic数据程序处理的逻辑不一样
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable

/**
  * 一个SparkStreaming对接多个kafkatopic,每个topic数据程序处理的逻辑不一样
  */
object SparkStreamingKafka {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafka")
        val ssc = new StreamingContext(conf, Durations.seconds(5))

        val topics: String = "hainiu_sk,hainiu_sk2"
        // 读取kafka的配置
        val kafkaParams = new mutable.HashMap[String, Object]()
        kafkaParams += "bootstrap.servers" -> "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"
        kafkaParams += "group.id" -> "group25"
        kafkaParams += "key.deserializer" -> classOf[StringDeserializer].getName
        kafkaParams += "value.deserializer" -> classOf[StringDeserializer].getName
        kafkaParams += "auto.offset.reset" -> "earliest"
        kafkaParams += "enable.auto.commit" -> "true"

        // 设置位置策略
        val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent

        // 设置消费策略
        // 消费: 订阅和分配两种策略
        //val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(topics.split(",").toSet, kafkaParams)
        val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(Set(topics), kafkaParams)
        //ConsumerStrategies.Assign()

        // 创建kafka直连流(没有receiver)
        // 直连流特点: kafka有多少个分区,SparkStreaming流就有多少个分区
        val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
            locationStrategy,
            consumerStrategy
        )

        var ranges: HasOffsetRanges = null

        // kafkaStream 只有刚开始得到的直连流,才保存kafka元数据信息,一旦流转换成其他流,那kafka元数据信息就丢失了
        val kafkaStream2: DStream[ConsumerRecord[String, String]] = kafkaStream.transform(kafkaRdd => {
            println(kafkaRdd.getNumPartitions)
            // driver 端
            ranges = kafkaRdd.asInstanceOf[HasOffsetRanges]
            val arr: Array[OffsetRange] = ranges.offsetRanges
            for (offsetRange <- arr) {
                val topic: String = offsetRange.topic
                val partition: Int = offsetRange.partition
                val offset: Long = offsetRange.fromOffset
                println(s"当前批次刚开始消费的offset: ${topic}\t${partition}\t${offset}")

            }
            kafkaRdd
        })

        val ds222: DStream[(String, String)] = kafkaStream2.map(f => (s"${f.topic()}-${f.partition()}", f.value()))
        val ds223: DStream[String] = ds222.flatMap(f => {
            val topicAndPartitioin: String = f._1
            val topic: String = topicAndPartitioin.split("-")(0)
            val value: String = f._2
            // 通过不同的topic,处理不同的数据
            if (topic.equals("hainiu_sk")) {
                value.split(" ").toIterator
            } else {
                value.split("#").toIterator
            }
        })
        val resDS: DStream[(String, Long)] = ds223.countByValue()

        resDS.foreachRDD((rdd, time) => {
            println(s"time:${time}, data:${rdd.collect().toBuffer}")
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

10.sparkstreaming-kafka高级开发

1.广播变量

//1.使用文件流join kafka流
//2.累加器ssc.sparkContext.longAccumulator
//3.广播变量

import java.io.{BufferedReader, InputStreamReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.util.LongAccumulator
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.Breaks

/**
  * 广播变量
  * 使用文件流join kafka流
  */
object KafkaStreamJoinFileByBroadUpdateConfig {
    def main(args: Array[String]): Unit = {
        // cpu核数 = kafka数据对应的分区数 + driver用的1个
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaStreamJoinFileByBroadUpdateConfig")

        val ssc = new StreamingContext(conf, Durations.seconds(5))
        val topic: String = "hainiu_sk3"
        // 读取kafka的配置
        val kafkaParams = new mutable.HashMap[String, Object]()
        kafkaParams += "bootstrap.servers" -> "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"
        kafkaParams += "group.id" -> "group25"
        kafkaParams += "key.deserializer" -> classOf[StringDeserializer].getName
        kafkaParams += "value.deserializer" -> classOf[StringDeserializer].getName
        kafkaParams += "auto.offset.reset" -> "earliest"
        kafkaParams += "enable.auto.commit" -> "true"

        //位置策略
        val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent

        // 分配策略:读分区0和1
        val topicPartitions = new ListBuffer[TopicPartition]
        topicPartitions += new TopicPartition(topic, 0)
        topicPartitions += new TopicPartition(topic, 1)
        val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Assign(topicPartitions, kafkaParams)
        // 分区0和分区1构建出来的流
        val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
            locationStrategy,
            consumerStrategy
        )

        // 分配策略:读分区2和3
        val topicPartitions2 = new ListBuffer[TopicPartition]
        topicPartitions2 += new TopicPartition(topic, 2)
        topicPartitions2 += new TopicPartition(topic, 3)
        val consumerStrategy2: ConsumerStrategy[String, String] = ConsumerStrategies.Assign(topicPartitions2, kafkaParams)
        // 分区2和分区3构建出来的流
        val kafkaStream2: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
            locationStrategy,
            consumerStrategy2
        )

        // 将两个流union在一起
        val kafkaStreams = new ArrayBuffer[InputDStream[ConsumerRecord[String, String]]]
        kafkaStreams += kafkaStream
        kafkaStreams += kafkaStream2
        val unionDS: DStream[ConsumerRecord[String, String]] = ssc.union(kafkaStreams)

        // "aa aa aa"
        val ds2: DStream[String] = unionDS.map(_.value())
        val resDS: DStream[(String, Long)] = ds2.flatMap(_.split(" ")).countByValue()

        // 定义广播变量
        var broad: Broadcast[mutable.HashMap[String, String]] = ssc.sparkContext.broadcast(new mutable.HashMap[String, String])

        // 定义匹配到的累加器
        val matchAcc: LongAccumulator = ssc.sparkContext.longAccumulator
        // 定义没匹配到的累加器
        val notMatchAcc: LongAccumulator = ssc.sparkContext.longAccumulator

        // 定义更新间隔时间
        val configUpdateIntervalTime: Long = 10000L

        // 定义广播变量最后更新时间
        var configUpdateOverTime: Long = 0L

        resDS.foreachRDD((rdd, time) => {
            if (!rdd.isEmpty()) {
                // 如果广播变量是空的,首次加载文件内容到广播变量里
                // 如果当前时间-最后更新时间 >= 更新间隔时间, 加载文件内容到广播变量里(达到定期更新配置)
                if (broad.value.size == 0 || System.currentTimeMillis() - configUpdateOverTime >= configUpdateIntervalTime) {
                    // 定义一个configMap用于装读取的文件内容
                    val configMap = new mutable.HashMap[String, String]

                    // 遍历hdfs目录下的所有文件加载到广播变量里
                    val configBaseDir: String = "/tmp/sparkstreaming/input_updateSparkBroadCast"
                    val fs: FileSystem = FileSystem.get(new Configuration())
                    val configBasePath = new Path(configBaseDir)
                    // 获取hdfs目录下的所有文件
                    val arr: Array[FileStatus] = fs.listStatus(configBasePath)
                    // 读取每个文件的内容加载到map
                    for (file <- arr) {
                        val path: Path = file.getPath
                        var reader: BufferedReader = null
                        try {
                            // 基于读取hdfs文件产生的流创建BufferedReader对象
                            reader = new BufferedReader(new InputStreamReader(fs.open(path)))
                            var line: String = null
                            val breaks = new Breaks
                            breaks.breakable(
                                while (true) {
                                    line = reader.readLine()
                                    if (line == null) {
                                        breaks.break()
                                    }
                                    val arr2: Array[String] = line.split("\t")
                                    val countryCode: String = arr2(0)
                                    val countryName: String = arr2(1)
                                    configMap += (countryCode -> countryName)
                                }
                            )

                        } catch {
                            case e: Exception => e.printStackTrace()
                        } finally {
                            reader.close()
                        }
                    }

                    // 清空广播变量
                    broad.unpersist()
                    // 再加载广播变量
                    broad = ssc.sparkContext.broadcast(configMap)

                    println(s"time:${System.currentTimeMillis()},configMap:${configMap}")

                    // 更新完广播变量之后,需要修改最后更新时间
                    configUpdateOverTime = System.currentTimeMillis()

                }

                // 广播变量和kafka流join
                rdd.foreachPartition((it) => {
                    // 提取广播变量里的Map
                    val configMap: mutable.HashMap[String, String] = broad.value
                    it.foreach(f => {
                        // (CN, 2)
                        val countryCode: String = f._1
                        val option: Option[String] = configMap.get(countryCode)
                        if (option == None) {
                            // 没匹配到
                            println(s"not match data:${f}")
                            notMatchAcc.add(1)
                        } else {
                            // 匹配到
                            val countryName: String = option.get
                            println(s"match data:${f}\t${countryName}")
                            matchAcc.add(1)
                        }

                    })
                }
                )

                // 如果要将join的结果写入MySQL
                // 1) rdd2 = rdd.mapPartitions((it) => it)
                // 2) arr = rdd2.collect // 拉取到driver端
                // 3) 把arr的数据写入MySQL(在driver端写入MySQL)
                // action之后,输出累加器
                println(s"time:${time}, matchAcc:${matchAcc.value}")
                println(s"time:${time}, notMatchAcc:${notMatchAcc.value}")
                // 清空累加器
                matchAcc.reset()
                notMatchAcc.reset()
            }
        })
        ssc.start()
        ssc.awaitTermination()
    }
}

11.sparkStreaming-kafka的offset管理

//Exactly once(只处理一次)==> 手动把offset维护到第三方存储,比如zookeeper

zookeeper管理
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75790
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter