我定义了20个Long 类型的变量 ,如何定义才能用reducerbykey ,每个字段不能丢 后期要对 时间段,批次,和全部进行统计
我自己定义了一个但是报错
JavaPairDStream<Long, Tuple19<Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long>> tatoalOrMmeReivers = transform.mapPartitionsToPair(m -> {
List<Tuple2<Long,Tuple19<Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long>>> outList = new LinkedList<>();
while (m.hasNext()) {
String next = m.next();
outList.add(new Tuple2<Long, Tuple19<Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long>>
(1L, new Tuple19<Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long>
(1L, 1L, 1L, 1L, 1L, 1L, 11L, 11L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L)));
}
return outList.iterator();
});
JavaDStream<String> allTime = tatoalOrMmeReivers
.reduceByKey((r1, r2) -> r1 + r2)
.repartition(1)
.mapPartitions(m -> {
StringBuilder builder = new StringBuilder();
LinkedList<String> strings = new LinkedList<>();
while (m.hasNext()) {
Tuple2<Long, Tuple19<Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long, Long>> next = m.next();
builder.append("省份编码:")
.append(cityName)
.append("\t")
.append("数据时间区间:")
.append(next._1)
.append("\t")
.append("数量:")
.append(next._2)
.append("\t")
.append("有效数量:")
.append(next._2);
strings.add(builder.toString());
builder.setLength(0);
}
return strings.iterator();
});
报错位置
![file](http://hainiubl.com/uploads/images/202106/07/4428/fMP92fckAY.png)
求大神解惑