重名造成线程冲突?

问答 岁月流年 ⋅ 于 2021-06-09 15:25:15 ⋅ 最后回复由 青牛 2021-06-09 18:24:36 ⋅ 1782 阅读
  JavaPairDStream<Long, List<Long>> tatoalOrMmeReivers = transform.mapPartitionsToPair(m -> {
            List<Tuple2<Long, List<Long>>> outList = new LinkedList<>();
            Long minTime = 1609430400000L;
            Long maxTime = 2051193600000L;
            List<Long> values = new ArrayList<>();
            while (m.hasNext()) {
                String next = m.next();
                if (StringUtils.isNotEmpty(next) && next.split("\\|").length > 8) {
                    String procedureType = next.split("\\|")[0];
                    String plocalProvince = next.split("\\|")[1];
                    String localCity = next.split("\\|")[2];
                    String ownerProvince = next.split("\\|")[3];
                    String ownerCity = next.split("\\|")[4];
                    String IMSI = next.split("\\|")[5];
                    String MSISDN = next.split("\\|")[6];
                    String IMEI = next.split("\\|")[7];
                    Long procedureStartTime = Long.parseLong(next.split("\\|")[8]);
                    String startLocationLongitude = next.split("\\|")[9];
                    String startLocationLatitude = next.split("\\|")[10];
                    String locationSource = next.split("\\|")[11];
                    String TAC = next.split("\\|")[12];
                    String cellID = next.split("\\|")[13];
                    String fromCountry = next.split("\\|")[14];
                    String fromSource = next.split("\\|")[15];
                    procedureStartTime = procedureStartTime / 60000 * 60000;
//                    String terminalTac = next.split("\\|")[16];
                    if (procedureType.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (plocalProvince.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (localCity.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (ownerProvince.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (ownerCity.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (IMSI.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (MSISDN.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (IMEI.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (procedureStartTime < minTime || procedureStartTime > maxTime) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (startLocationLongitude.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (startLocationLatitude.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (locationSource.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (TAC.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (cellID.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (fromCountry.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (fromSource.length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    if (next.split("\\|").length < 17 && topic.contains("mme")) {
                        values.add(1L);
                    } else if (next.split("\\|")[16].length() == 0) {
                        values.add(1L);
                    } else {
                        values.add(0L);
                    }
                    values.add(1L);
                    outList.add(new Tuple2<Long, List<Long>>(procedureStartTime, values));
                    values.clear();
                }
            }
            return outList.iterator();
        });
        // 输入: procedureStartTime , List(procedureTypeNum,procedureTypeNum,localCityNum,ownerProvinceNum,ownerCityNum,IMSINum,MSISDNNum,IMEINum,procedureStartTimeNum,startLocationLongitudeNum,startLocationLatitudeNum,locationSourceNum,TACNum,cellIDNum,fromCountryNum,fromSourceNum,terminalTacNum,total)
        JavaDStream<String> allTime = tatoalOrMmeReivers
                .reduceByKey(new Function2<List<Long>, List<Long>, List<Long>>() {
                    @Override
                    public List<Long> call(List<Long> v1, List<Long> v2) throws Exception {
                        List<Long> longs = new ArrayList<>();
                        for (int i = 0; i < v1.size(); i++) {
                            long l = v1.get(i) + v2.get(i);
                            longs.add(l);
                        }
                        return longs;
                    }
                })
                .repartition(1)
                .mapPartitions(m -> {
                    StringBuilder builder = new StringBuilder();
                    LinkedList<String> strings = new LinkedList<>();
                    while (m.hasNext()) {
                        Tuple2<Long, List<Long>> next = m.next();
                        builder.append("省份编码:")
                                .append(topic)
                                .append("\t")
                                .append("数据时间:")
                                .append(next._1)
                                .append("\t");

                        builder.append(next._2.get(1)).append("\t")
                                .append(next._2.get(2)).append("\t")
                                .append(next._2.get(3)).append("\t")
                                .append(next._2.get(4)).append("\t")
                                .append(next._2.get(7)).append("\t")
                                .append(next._2.get(8)).append("\t")
                                .append(next._2.get(9)).append("\t")
                                .append(next._2.get(10)).append("\t")
                                .append(next._2.get(11)).append("\t")
                                .append(next._2.get(12)).append("\t")
                                .append(next._2.get(13)).append("\t")
                                .append(next._2.get(14)).append("\t")
                                .append(next._2.get(15)).append("\t")
                                .append(next._2.get(16)).append("\t");

                        //
//                        for (int i = 0; i < next._2.size() - 1; i++) {
//                            builder.append(next._2.get(i)).append("\t");
//                        }
                        builder.append("数量:").append(next._2.get(17)).append("\t")
                                .append("有效数量:").append(next._2.get(17));
                        strings.add(builder.toString());
                        builder.setLength(0);
                    }
                    return strings.iterator();
                });
        // 输入:cityName,procedureStartTime,procedureTypeNum,procedureTypeNum,localCityNum,ownerProvinceNum,ownerCityNum,IMSINum,MSISDNNum,IMEINum,procedureStartTimeNum,startLocationLongitudeNum,startLocationLatitudeNum,locationSourceNum,TACNum,cellIDNum,fromCountryNum,fromSourceNum,terminalTacNum,total(19),ptotal(20)
        allTime.foreachRDD((f, time) -> {
            JavaRDD<String> stringJavaRDD = f.mapPartitions(m -> {
                ArrayList<String> list = new ArrayList<>(1);
                StringBuilder builder = new StringBuilder();
                long tTime = time.milliseconds();
                tTime = tTime / 60000 * 60000;

                while (m.hasNext()) {
                    String[] split = m.next().split("\t");
                    String[] stime = split[1].split(":");
                    long sTime = Long.parseLong(stime[1]);
                    long leadTime = (tTime - Long.parseLong(stime[1])) / 60000;
                    builder.append(split[0])
                            .append("\t")
                            .append("批次时间:")
                            .append(time.milliseconds())
                            .append("\t")
                            .append("数据时间:")
                            .append(sTime)
                            .append("\t");
                    for (int i = 2; i < split.length - 2; i++) {
                        builder.append(split[i])
                                .append("\t");
                    }
                    if (stime.length == 13) {
                        builder.append("有效数据量:")
                                .append(split[20].split(":")[1])
                                .append("\t")
                                .append("时间差:")
                                .append(leadTime);
                    } else {
                        builder.append("有效数据量:")
                                .append(0)
                                .append("时间差:")
                                .append(-1);
                    }
                    builder.append(System.lineSeparator());
                }
                list.add(builder.toString());
                return list.iterator();
            });
            stringJavaRDD.foreach(ff -> stringListAccumulator.add(ff));
        });
        javaStreamingContext.start();

报错是
file

问题出在哪里?
求大神解答

成为第一个点赞的人吧 :bowtie:
回复数量: 4
  • 岁月流年
    2021-06-09 16:32:00

    问题已解决

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2021-06-09 17:00:21

    @666666 以后发markdown格式的

  • 岁月流年
    2021-06-09 17:01:18

    @青牛 好的

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2021-06-09 18:24:36

    @666666 看看我帮你改完,格式多工整

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter