JavaPairDStream<Long, List<Long>> tatoalOrMmeReivers = transform.mapPartitionsToPair(m -> {
List<Tuple2<Long, List<Long>>> outList = new LinkedList<>();
Long minTime = 1609430400000L;
Long maxTime = 2051193600000L;
List<Long> values = new ArrayList<>();
while (m.hasNext()) {
String next = m.next();
if (StringUtils.isNotEmpty(next) && next.split("\\|").length > 8) {
String procedureType = next.split("\\|")[0];
String plocalProvince = next.split("\\|")[1];
String localCity = next.split("\\|")[2];
String ownerProvince = next.split("\\|")[3];
String ownerCity = next.split("\\|")[4];
String IMSI = next.split("\\|")[5];
String MSISDN = next.split("\\|")[6];
String IMEI = next.split("\\|")[7];
Long procedureStartTime = Long.parseLong(next.split("\\|")[8]);
String startLocationLongitude = next.split("\\|")[9];
String startLocationLatitude = next.split("\\|")[10];
String locationSource = next.split("\\|")[11];
String TAC = next.split("\\|")[12];
String cellID = next.split("\\|")[13];
String fromCountry = next.split("\\|")[14];
String fromSource = next.split("\\|")[15];
procedureStartTime = procedureStartTime / 60000 * 60000;
// String terminalTac = next.split("\\|")[16];
if (procedureType.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (plocalProvince.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (localCity.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (ownerProvince.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (ownerCity.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (IMSI.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (MSISDN.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (IMEI.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (procedureStartTime < minTime || procedureStartTime > maxTime) {
values.add(1L);
} else {
values.add(0L);
}
if (startLocationLongitude.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (startLocationLatitude.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (locationSource.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (TAC.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (cellID.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (fromCountry.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (fromSource.length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
if (next.split("\\|").length < 17 && topic.contains("mme")) {
values.add(1L);
} else if (next.split("\\|")[16].length() == 0) {
values.add(1L);
} else {
values.add(0L);
}
values.add(1L);
outList.add(new Tuple2<Long, List<Long>>(procedureStartTime, values));
values.clear();
}
}
return outList.iterator();
});
// 输入: procedureStartTime , List(procedureTypeNum,procedureTypeNum,localCityNum,ownerProvinceNum,ownerCityNum,IMSINum,MSISDNNum,IMEINum,procedureStartTimeNum,startLocationLongitudeNum,startLocationLatitudeNum,locationSourceNum,TACNum,cellIDNum,fromCountryNum,fromSourceNum,terminalTacNum,total)
JavaDStream<String> allTime = tatoalOrMmeReivers
.reduceByKey(new Function2<List<Long>, List<Long>, List<Long>>() {
@Override
public List<Long> call(List<Long> v1, List<Long> v2) throws Exception {
List<Long> longs = new ArrayList<>();
for (int i = 0; i < v1.size(); i++) {
long l = v1.get(i) + v2.get(i);
longs.add(l);
}
return longs;
}
})
.repartition(1)
.mapPartitions(m -> {
StringBuilder builder = new StringBuilder();
LinkedList<String> strings = new LinkedList<>();
while (m.hasNext()) {
Tuple2<Long, List<Long>> next = m.next();
builder.append("省份编码:")
.append(topic)
.append("\t")
.append("数据时间:")
.append(next._1)
.append("\t");
builder.append(next._2.get(1)).append("\t")
.append(next._2.get(2)).append("\t")
.append(next._2.get(3)).append("\t")
.append(next._2.get(4)).append("\t")
.append(next._2.get(7)).append("\t")
.append(next._2.get(8)).append("\t")
.append(next._2.get(9)).append("\t")
.append(next._2.get(10)).append("\t")
.append(next._2.get(11)).append("\t")
.append(next._2.get(12)).append("\t")
.append(next._2.get(13)).append("\t")
.append(next._2.get(14)).append("\t")
.append(next._2.get(15)).append("\t")
.append(next._2.get(16)).append("\t");
//
// for (int i = 0; i < next._2.size() - 1; i++) {
// builder.append(next._2.get(i)).append("\t");
// }
builder.append("数量:").append(next._2.get(17)).append("\t")
.append("有效数量:").append(next._2.get(17));
strings.add(builder.toString());
builder.setLength(0);
}
return strings.iterator();
});
// 输入:cityName,procedureStartTime,procedureTypeNum,procedureTypeNum,localCityNum,ownerProvinceNum,ownerCityNum,IMSINum,MSISDNNum,IMEINum,procedureStartTimeNum,startLocationLongitudeNum,startLocationLatitudeNum,locationSourceNum,TACNum,cellIDNum,fromCountryNum,fromSourceNum,terminalTacNum,total(19),ptotal(20)
allTime.foreachRDD((f, time) -> {
JavaRDD<String> stringJavaRDD = f.mapPartitions(m -> {
ArrayList<String> list = new ArrayList<>(1);
StringBuilder builder = new StringBuilder();
long tTime = time.milliseconds();
tTime = tTime / 60000 * 60000;
while (m.hasNext()) {
String[] split = m.next().split("\t");
String[] stime = split[1].split(":");
long sTime = Long.parseLong(stime[1]);
long leadTime = (tTime - Long.parseLong(stime[1])) / 60000;
builder.append(split[0])
.append("\t")
.append("批次时间:")
.append(time.milliseconds())
.append("\t")
.append("数据时间:")
.append(sTime)
.append("\t");
for (int i = 2; i < split.length - 2; i++) {
builder.append(split[i])
.append("\t");
}
if (stime.length == 13) {
builder.append("有效数据量:")
.append(split[20].split(":")[1])
.append("\t")
.append("时间差:")
.append(leadTime);
} else {
builder.append("有效数据量:")
.append(0)
.append("时间差:")
.append(-1);
}
builder.append(System.lineSeparator());
}
list.add(builder.toString());
return list.iterator();
});
stringJavaRDD.foreach(ff -> stringListAccumulator.add(ff));
});
javaStreamingContext.start();
报错是
问题出在哪里?
求大神解答