请问如何设置 kafka 的偏移量,采用 DStream 时没设置偏移量,每次启动从新消费,存在重复消费?

问答 歌唱祖国 ⋅ 于 2018-02-22 14:29:22 ⋅ 最后回复由 青牛 2018-02-23 13:50:31 ⋅ 4916 阅读

请问如何设置kafka的偏移量,采用DStream时没设置偏移量,每次启动从新消费,存在重复消费?
file
为什么只有stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges}这样设置偏移量才对。如果转换成DStream后怎么设置偏移量?
val stream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String]
(Array(topics), kafkaParam))
//将数据流遍历
stream.foreachRDD { rdd =>
//设置offset :offsetRanges
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

成为第一个点赞的人吧 :bowtie:
回复数量: 4
  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2018-02-23 00:00:13

    你说的是创建之前设置offset吗?

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2018-02-23 00:00:31
    val value: ConsumerStrategy[String,String] = ConsumerStrategies.Subscribe(topicSet,kafkaParams,offset)
    val lines: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,value)
    
  • 歌唱祖国 不要跟过去的自己比,要期待未来的自己,珍爱现在的自己。
    2018-02-23 07:45:32

    @青牛 你这样子设置的offset消费完怎么提交,我的意思是想在消费的时候记录它的offset,等到将数据计算完才把offset提交上去,下一个批次处理的时候不会出现重复消费的现象。该怎么写?

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2018-02-23 13:50:31

    @歌唱祖国 checkpoint可以保存这个offset,或都你自己找个地记下来,下次启动的时候当参数传进去。你工作咋这么积极那?过完年就开工了?

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter