请问如何设置kafka的偏移量,采用DStream时没设置偏移量,每次启动从新消费,存在重复消费?
为什么只有stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges}这样设置偏移量才对。如果转换成DStream后怎么设置偏移量?
val stream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String]
(Array(topics), kafkaParam))
//将数据流遍历
stream.foreachRDD { rdd =>
//设置offset :offsetRanges
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges