flink key by 后的 process 不执行?

问答 张浩 ⋅ 于 2021-09-13 20:30:58 ⋅ 最后回复由 青牛 2021-09-14 16:54:35 ⋅ 2446 阅读

DataStreamSource scanJobStreamSource = KafkaUtil.getStreamSource(env, prop.getProperty("bootstrap.servers"), prop.getProperty("group.id"),"kuai2", new PoJoDeserializationSchema(ScanInfo.class,false));

scanJobStreamSource.map(new ScanInfoJobMapfunction()).name("ScanInfo2ScanWrapInfo")

.keyBy((KeySelector<ScanWrapInfo, String>) scanWrapInfo -> scanWrapInfo.getJobCode()) .process(new JobCodeKeyProcessFunction())

    .name("JobCodeKeyProcessFunction").print();

    keyBy之后一直显示运行,debug 也显示 the application is running,但是数据卡在keyBy这里。
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 5
  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2021-09-13 22:30:46

    看一下webui是不是并行度不够

  • 张浩
    2021-09-14 10:46:02

    @青牛 我是用的几个测试数据,应该一个并行度就可以跑的。然后我改变并行度后,还是卡在keyby那里。我打印出下keyBy之前的输出,1> ScanWrapInfo{billCode='210128', jobCode='LHAR_UKK_URT_TL-210128', scanCode='12345', scanSiteCode='HHH', scanSiteId=11, scanSite='BALIDAO', scanTypeName='03', scanSiteTypeId='30', weight=4.3, volume=24000.0, preOrNextStationId=12000, preOrNextStation='URT', scanDate=Mon Sep 13 16:12:26 CST 2021},这个是对象的表示形式吧,按照它的一个属性 jobCode来分组是不是可以的?

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2021-09-14 10:56:33

    改完并行度要加槽

  • 张浩
    2021-09-14 16:18:32

    @青牛 我的本地有8个核,我把并行度改为4,槽应该够用吧

  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2021-09-14 16:54:35

    你看一下webui呗,你的task都跑起来了吗

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter