Flink

分享 YY12542541 ⋅ 67 阅读

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

public static void main(String[] args) throws Exception {

    // 1. 创建流式执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 1. 创建流式执行环境,本地带UI界面启动方式,需要引入指定的maven,正式环境不建议使用
    // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

    // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号
    DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);

    // 3. 转换、分组、求和,得到统计结果
    SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
        String[] words = line.split(" ");

        for (String word : words) {
            out.collect(Tuple2.of(word, 1L));
        }
    }).returns(Types.TUPLE(Types.STRING, Types.LONG))
            .keyBy(data -> data.f0)
            .sum(1);

    // 4. 打印
    sum.print();

    // 5. 执行
    env.execute();
}

}

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-YY12542541,http://hainiubl.com/topics/76691
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter