11.sparkStreaming03

教程 DER ⋅ 于 2023-04-15 17:33:19 ⋅ 1555 阅读

sparkStreaming03

21.5.sparkStreaming接入kafka

21.5.1 spark-streaming-kafka

kafka回顾

file

准备环境

file

创建topic

kafka-topics.sh --bootstrap-server kafka1-24406:9092 --create --topic topic_hainiu --partitions 3 --replication-factor 2

file

kafka读取数据的配置

sparkStreaming 读kafka,有两种方式,一种读zookeeper(现有版本已抛弃),一种读broker,也就是kafka直连流方式。

1)位置策略

Spark Streaming 中提供了如下三种位置策略,用于指定 Kafka 主题分区与 Spark 执行程序 Executors 之间的分配关系:

PreferConsistent : 它将在所有的 Executors 上均匀分配分区;

PreferBrokers : 当 Spark 的 Executor 与 Kafka Broker 在同一机器上时可以选择该选项,它优先将该 Broker 上的首领分区分配给该机器上的 Executor;

PreferFixed : 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机,其构造器如下:

@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
 new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
 new PreferFixed(hostMap)

2)消费策略

订阅和分配。

订阅:可订阅一个主题所有分区或多个主题所有分区。

分配:可消费指定主题分区数据。

Spark Streaming 提供了两种主题订阅方式,分别为 Subscribe 和 SubscribePattern。后者可以使用正则匹配订阅主题的名称。其构造器分别如下:

/**
 \* @param topics 需要订阅的主题的集合
 \* @param Kafka 消费者参数
 \* @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的值
 */
def Subscribe[K, V](
  topics: ju.Collection[jl.String],
  kafkaParams: ju.Map[String, Object],
  offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }
/**
 \* @param pattern需要订阅的正则
 \* @param Kafka 消费者参数
 \* @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的值
 */
def SubscribePattern[K, V](
  pattern: ju.regex.Pattern,
  kafkaParams: collection.Map[String, Object],
  offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }

3)程序代码

package com.hainiu.spark

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 1.kafka params
 * 2.topic
 * 3.subsribe
 * 4.calculate
 */
object Testkafka {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("kafka").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(2))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "kafka1-24406:9092,kafka2-24406:9092,kafka3-24406:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "sparkStreaming",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topic_hainiu")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    //stream ==> topic partition offset k v
    stream.map(_.value()).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

测试输入数据

kafka-console-producer.sh --bootstrap-server kafka1-24406:9092 --topic topic_hainiu 

file

idea打印数据

file

21.5.2 SparkStreaming动态更新广播变量

广播变量

file

所以我们需要实现动态广播技术

准备数据从/public/data/country_data中复制country.txt到idea中

并且拆分一个国家信息文件为两个,将这个文件夹中的数据作为广播

file

整体代码如下:

package com.hainiu.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.util.Scanner
import scala.collection.mutable

/**
 * 1.kafka params
 * 2.topic
 * 3.subsribe
 * 4.calculate
 */
object TestkafkaWithDynamicBroadcast {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("kafka").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(2))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "kafka1-24406:9092,kafka2-24406:9092,kafka3-24406:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "sparkStreaming",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topic_hainiu")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    //stream ==> topic partition offset k v

    val fs = FileSystem.getLocal(new Configuration())
    val map = mutable.Map[String,String]()
    var bs = ssc.sparkContext.broadcast(map)
    val interval = 10000
    var lastUpdateTime = 0L

    stream.map(_.value())
      .foreachRDD(rdd=>{
        //driver
        if(bs.value.isEmpty || System.currentTimeMillis() - lastUpdateTime > interval){
          val statuses = fs.listStatus(new Path("data/country"))
          statuses.foreach(t=>{
            val in = fs.open(t.getPath)
            val scanner = new Scanner(in)
            while(scanner.hasNext()){
              val line = scanner.nextLine()
              val strs = line.split("\t")
              map.put(strs(0),strs(1))
            }
            in.close()
          })
          bs.unpersist()
          bs = ssc.sparkContext.broadcast(map)
          lastUpdateTime = System.currentTimeMillis()
        }

        rdd.map(code=>{
          bs.value.getOrElse(code,"unknow")
        }).foreach(println)

    })

    ssc.start()
    ssc.awaitTermination()
  }
}

file

将CN改为中华人民共和国

file

21.5.3 sparkStreaming-kafka的offset管理

21.5.3.1 receiver方式 vs 直连方式

sparkStreaming-kafka 的 receiver 方式

file

file

sparkStreaming-kafka 的 Direct 方式

file

file

receiver 和 direct 方式有什么区别?

receiver 方式:

receiver把固定间隔的数据放在内存中,使用kafka高级的API,自动维护偏移量,达到固定的时间一起处理每个批次的offset数据,效率低且容易丢数据,因为数据在内存中,为了容错,还得加入预写日志。

Direct 直连方式:

会周期性地查询Kafka,获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

此种方式相当于直接连接到kafka的分区上(无需receiver,也不需要预写日志),一个RDD的分区对应一个Kafka的分区,使用Kafka底层的API去读取数据,效率高。

流式计算有三种容错语义,分别是:

at-most-once(最多一次):每条记录将被处理一次或根本不处理。

at-least-once(至少一次):每条记录将被处理一次或多次。这比最多一次强,因为它确保不会丢失任何数据。但可能有重复。

Exactly once(只处理一次):每条记录只会被处理一次 - 不会丢失任何数据,也不会多次处理数据。这显然是三者中最强的保证。

SparkStreaming直连kafka可以保证时效最强语义,但需要我们自己去维护偏移量(现在比较流行的方式是手动把offset维护到第三方存储,比如zookeeper、MySQL等。)。

如果想实现最强语义,需要做到以下几点:

1)kafka源支持重复读取。

2)SparkStreaming的输出要支持幂等性或事务。

幂等性:输出多次的操作内容是一样的。

事务:将输出和维护offset放在一个事务中,要么都成功,要么都失败。

3)需要我们自己手动去维护消费的offset。

总结下来就是:

直连kafka,kafka的offset 由 开发者自己维护,获取要消费的offset,进行消费处理,处理完成后,自行维护offset,输出要支持幂等性或事务。

http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html

例子见如下代码

21.5.3.2 receiver方式管理offset

代码:SparkStreamingKafkaStream

目前被直连方式替代,代码看看即可。

21.5.3.3 direct方式管理offset

21.5.3.3.1手动提交offset到kafka

代码:SparkStreamingKafkaOffsetNotAutoCommit

代码逻辑:

file

整体代码:

package com.hainiu.spark

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 1.kafka params
 * 2.topic
 * 3.subsribe
 * 4.calculate
 */
object Testkafka {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("kafka").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(2))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "kafka1-24406:9092,kafka2-24406:9092,kafka3-24406:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "sparkStreaming1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topic_hainiu")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    //stream ==> topic partition offset k v
//    stream.map(_.value()).print()

    stream.foreachRDD(rdd=>{
      val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreach(println)
      stream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

file

21.5.3.3.2手动提交offset到zookeeper(外部存储系统)

代码:SparkStreamingKafkaOffsetZKForeachRDD

代码逻辑:

file

整体代码:

package com.hainiu.spark.offset

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.collection.mutable.HashMap

/**
 * 偏移量保存到zk中
 * 不使用DStream的transform等其它算子
 * 将DStream数据处理方式转成纯正的spark-core的数据处理方式
 */
object SparkStreamingKafkaOffsetZKForeachRDD {
  def main(args: Array[String]): Unit = {
    //指定组名
    val group = "groupxxx"
    //创建SparkConf
    val conf = new SparkConf().setAppName("SparkStreamingKafkaOffsetZK").setMaster("local[*]")
    //创建SparkStreaming,设置间隔时间
    val ssc = new StreamingContext(conf, Durations.seconds(5))
    //指定 topic 名字
    val topic = "topic_41"
    //指定kafka的broker地址,SparkStream的Task直连到kafka的分区上,用底层的API消费,效率更高
    //    val brokerList = "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092"
    val brokerList = "s1.hadoop:9092"
    //指定zk的地址,更新消费的偏移量时使用,当然也可以使用Redis和MySQL来记录偏移量
    val zkQuorum = "nn1.hadoop:2181"
    //SparkStreaming时使用的topic集合,可同时消费多个topic
    val topics: Set[String] = Set(topic)
    //topic在zk里的数据路径,用于保存偏移量
    val topicDirs = new ZKGroupTopicDirs(group, topic)
    //得到zk中的数据路径 例如:"/consumers/${group}/offsets/${topic}"
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"

    //kafka参数
    val kafkaParams = Map(
      "bootstrap.servers" -> brokerList,
      "group.id" -> group,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "enable.auto.commit" -> (false: java.lang.Boolean),
      //earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      "auto.offset.reset" -> "latest"
    )

    //定义一个空的kafkaStream,之后根据是否有历史的偏移量进行选择
    var kafkaStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果存在历史的偏移量,那使用fromOffsets来存放存储在zk中的每个TopicPartition对应的offset
    var fromOffsets = new HashMap[TopicPartition, Long]

    //创建zk客户端,可以从zk中读取偏移量数据,并更新偏移量
    val zkClient = new ZkClient(zkQuorum)

    //从zk中查询该数据路径下是否有每个partition的offset,这个offset是我们自己根据每个topic的不同partition生成的
    //数据路径例子:/consumers/${group}/offsets/${topic}/${partitionId}/${offset}"
    //zkTopicPath = /consumers/group1311/offsets/hainiu_test/
    // /consumers/groupid/offsets/topic
    //路径 /consumers/g3/topic_42/0 --> 100
    //路径 /consumers/g3/topic_42/1 --> 100
    //路径 /consumers/g3/topic_42/2 --> 100
    val children = zkClient.countChildren(zkTopicPath)//3

    //判断zk中是否保存过历史的offset
    if (children > 0) {
      for (i <- 0 until children) {
        // /consumers/group100/offsets/hainiu_html/0
        // get /consumers/lishuai38/offsets/hainiu_html/44
        val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")
        // hainiu_html/0
        val tp = new TopicPartition(topic, i)
        //将每个partition对应的offset保存到fromOffsets中
        // hainiu_html/0 -> 888
        fromOffsets += tp -> partitionOffset.toLong
      }
      //      println(fromOffsets)
      //通过KafkaUtils创建直连的DStream,并使用fromOffsets中存储的历史偏离量来继续消费数据
      kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams, fromOffsets))
    } else {
      //如果zk中没有该topic的历史offset,那就根据kafkaParam的配置使用最新(latest)或者最旧的(earliest)的offset
      kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    }

    //通过rdd转换得到偏移量的范围
    var offsetRanges = Array[OffsetRange]()

    //迭代DStream中的RDD,将每一个时间间隔对应的RDD拿出来,这个方法是在driver端执行
    //在foreachRDD方法中就跟开发spark-core是同样的流程了,当然也可以使用spark-sql
    kafkaStream.foreachRDD((kafkaRDD, time) => {
      if (!kafkaRDD.isEmpty()) {
        //得到该RDD对应kafka消息的offset,该RDD是一个KafkaRDD,所以可以获得偏移量的范围
        //不使用transform可以直接在foreachRDD中得到这个RDD的偏移量,这种方法适用于DStream不经过任何的转换,
        //直接进行foreachRDD,因为如果transformation了那就不是KafkaRDD了,就不能强转成HasOffsetRanges了,从而就得不到kafka的偏移量了
        offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
        val dataRDD: RDD[String] = kafkaRDD.map(_.value())

        //执行这个rdd的aciton,这里rdd的算子是在集群上执行的
        dataRDD.foreachPartition(partition =>
          partition.foreach(x => {
            println(x)
          })
        )

        // 将最新的offset更新到zookeeper外部存储
        for (o <- offsetRanges) {
          //  /consumers/group100/offsets/hainiu_html/0
          val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
          //将该 partition 的 offset 保存到 zookeeper
          //  /consumers/group100/offsets/hainiu_html/12
          println(s"time:${time}==>维护到zk的offset是:${zkPath}__${o.untilOffset.toString}")
          ZkUtils(zkClient, false).updatePersistentPath(zkPath, o.untilOffset.toString)
        }
      }
    })

    ssc.start()
    ssc.awaitTermination()

  }
}

修改组id ,zookeeper的地址使用的是spark的集群的zk,kafka集群的地址

执行代码输入kafka中的数据

在nn1机器节点中进入zookeeper中查询偏移量信息

file

21.5.3.3.3 解决数据丢失的时候,程序启动问题

Kafka的数据默认保存7天,如果zookeeper里维护的是7天前数据的消费offset,当启动程序时会报错,如何解决?

说明:

file

代码逻辑:

file

整体代码

package com.hainiu.spark.offset

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Durations, StreamingContext}

import java.{lang, util}
import scala.collection.mutable.{HashMap, ListBuffer}

/**
  * 偏移量保存到zk中
  * 不使用DStream的transform等其它算子
  * 将DStream数据处理方式转成纯正的spark-core的数据处理方式
  * 由于SparkStreaming程序长时间中断,再次消费时kafka中数据已过时,
  * 上次记录消费的offset已丢失的问题处理
  */
object SparkStreamingKafkaOffsetZKRecovery {
  def main(args: Array[String]): Unit = {
    //指定组名
    val group = "group40"
    //创建SparkConf
    val conf = new SparkConf().setAppName("SparkStreamingKafkaOffsetZKRecovery").setMaster("local[*]")
    //创建SparkStreaming,设置间隔时间
    val ssc = new StreamingContext(conf, Durations.seconds(5))
    //指定 topic 名字
    val topic = "topic_41"
    //指定kafka的broker地址,SparkStream的Task直连到kafka的分区上,用底层的API消费,效率更高
//    val brokerList = "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092"
    val brokerList = "s1.hadoop:9092"
    //指定zk的地址,更新消费的偏移量时使用,当然也可以使用Redis和MySQL来记录偏移量
    val zkQuorum = "nn1.hadoop:2181,nn2.hadoop:2181,s1.hadoop:2181"
    //SparkStreaming时使用的topic集合,可同时消费多个topic
    val topics: Set[String] = Set(topic)
    //topic在zk里的数据路径,用于保存偏移量
    val topicDirs = new ZKGroupTopicDirs(group, topic)
    //得到zk中的数据路径 例如:"/consumers/${group}/offsets/${topic}"
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"

    //kafka参数
    val kafkaParams = Map(
      "bootstrap.servers" -> brokerList,
      "group.id" -> group,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "enable.auto.commit" -> (false: java.lang.Boolean),
      //earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      "auto.offset.reset" -> "earliest"
    )

    //定义一个空的kafkaStream,之后根据是否有历史的偏移量进行选择
    var kafkaStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果存在历史的偏移量,那使用fromOffsets来存放存储在zk中的每个TopicPartition对应的offset
    // 是外部存储zookeeper存的offset
    val zkOffsetMap = new HashMap[TopicPartition, Long]

    //创建zk客户端,可以从zk中读取偏移量数据,并更新偏移量
    val zkClient = new ZkClient(zkQuorum)

    //从zk中查询该数据路径下是否有每个partition的offset,这个offset是我们自己根据每个topic的不同partition生成的
    //数据路径例子:/consumers/${group}/offsets/${topic}/${partitionId}/${offset}"
    //zkTopicPath = /consumers/group100/offsets/hainiu_html/
    val children = zkClient.countChildren(zkTopicPath)

    //判断zk中是否保存过历史的offset
    if (children > 0) {
      for (i <- 0 until children) {
        // /consumers/group100/offsets/hainiu_html/0
        val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")
        // hainiu_html/0
        val tp = new TopicPartition(topic, i)
        //将每个partition对应的offset保存到fromOffsets中
        // hainiu_html/0 -> 888
        zkOffsetMap += tp -> partitionOffset.toLong
      }
      println("-------consumer zookeeper offset---------------")
      println(zkOffsetMap)

      /*
         通过kafkaConsumer对象,获取对应topic所有分区的kafka 数据最早的offset
       */
      // 创建kafkaConsumer对象
      import scala.collection.convert.ImplicitConversionsToJava.`map AsJavaMap`
      val kafkaConsumer = new KafkaConsumer(kafkaParams)

      // 获取topic的所有分区信息,主要拿到每个分区的编号
      val kafkaPartitionInfoList: util.List[PartitionInfo] = kafkaConsumer.partitionsFor(topic)
      import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

      val kafkaTopicPartitions = new ListBuffer[TopicPartition]

      for(f <- kafkaPartitionInfoList){
        val topicName: String = f.topic()
        val partitionId: Int = f.partition()
        kafkaTopicPartitions += new TopicPartition(topicName, partitionId)
      }

      // 根据每个分区编号获取每个分区中,kafka数据最早的offset
      import scala.collection.convert.ImplicitConversionsToJava.`collection asJava`
      val kafkaDataEarliestOffsetMap: util.Map[TopicPartition, lang.Long] = kafkaConsumer.beginningOffsets(kafkaTopicPartitions)

      println("-----kafka data Earliest offset----------------")
      println(kafkaDataEarliestOffsetMap)

      /*
          通过 zkOffset 与 kafkaDataEarliestOffset 做对比来修正 zkOffset
          用于解决SparkStreaming程序长时间中断,再次消费时已记录的offset丢失导致程序启动报错问题
       */
      import scala.collection.convert.ImplicitConversions.`map AsScala`
      // 外循环是kafkaDataOffset,内循环是zkOffset
      for((tp, value) <- kafkaDataEarliestOffsetMap){
        val partitionId: Int = tp.partition()
        val dataOffset: lang.Long = value

        val option: Option[Long] = zkOffsetMap.get(tp)

        // kafka 有的分区,但zk 没有, 给zk新增分区
        if (option == None) {
          zkOffsetMap += (tp -> dataOffset)
        } else {
          var zkOffset: Long = option.get
          if (zkOffset < dataOffset) {
            zkOffset = dataOffset
            zkOffsetMap += (tp -> zkOffset)
          }
        }

      }

      println("----修正后的 zkOffset--------------")
      println(zkOffsetMap)

      //通过KafkaUtils创建直连的DStream,并使用fromOffsets中存储的历史偏离量来继续消费数据
      kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams, zkOffsetMap))
    } else {
      //如果zk中没有该topic的历史offset,那就根据kafkaParam的配置使用最新(latest)或者最旧的(earliest)的offset
      kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    }

    //通过rdd转换得到偏移量的范围
    var offsetRanges = Array[OffsetRange]()

    //迭代DStream中的RDD,将每一个时间间隔对应的RDD拿出来,这个方法是在driver端执行
    //在foreachRDD方法中就跟开发spark-core是同样的流程了,当然也可以使用spark-sql
    kafkaStream.foreachRDD(kafkaRDD => {
      if (!kafkaRDD.isEmpty()) {
        //得到该RDD对应kafka消息的offset,该RDD是一个KafkaRDD,所以可以获得偏移量的范围
        //不使用transform可以直接在foreachRDD中得到这个RDD的偏移量,这种方法适用于DStream不经过任何的转换,
        //直接进行foreachRDD,因为如果transformation了那就不是KafkaRDD了,就不能强转成HasOffsetRanges了,从而就得不到kafka的偏移量了
        offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
        val dataRDD: RDD[String] = kafkaRDD.map(_.value())

        // 加载广播变量

        // 初始化累加器
        //执行这个rdd的aciton,这里rdd的算子是在集群上执行的
        dataRDD.foreachPartition(partition =>
          // executor 运行的业务在这里写

          partition.foreach(x => {
            println(x)
          })
        )

        //累加器统计结果写入MySQL

        for (o <- offsetRanges) {
          //  /consumers/group100/offsets/hainiu_html/0
          val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
          //将该 partition 的 offset 保存到 zookeeper
          //  /consumers/group100/offsets/hainiu_html/888
          println(s"${zkPath}__${o.untilOffset.toString}")
          ZkUtils(zkClient, false).updatePersistentPath(zkPath, o.untilOffset.toString)
        }
      }
    })

    ssc.start()
    ssc.awaitTermination()

  }
}

修改zk地址,kafka地址,groupid

运行结果如下:

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-DER,http://hainiubl.com/topics/76292
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter