Flink
启动
1.在yarn上启动jobManager
指定dirver端找到多个依赖的jar包
flink run -m yarn-cluster -yt /home/hadoop/spark_news_jars -yjm 1024 -ytm 1024 -yn 2 -ys 3 -yqu root.hainiu -ynm hainiuFlinkStreamingWordCount \
$(ll /home/hadoop/spark_news_jars/ |awk 'NR>1{print "
-C file:///home/hadoop/spark_news_jars/"$9}'|tr '\n' ' ') \
/usr/local/flink/examples/streaming/SocketWindowWordCount.jar --hostname nn1.hadoop --port 6666
#-C 命令是同时指定driver和taskManager运行的java程序的classpath。这里用这个命令只为了让driver找到jar包,taskmanager是通过yt命令找到jar包的,所以tm上有没有-C命令指定的文件是无所谓的。
#-C 命令指定的文件路径必须URI格式的,那本地文件就以file:///开头,注意不能使用文件通配符"*"
-yjm jobManager的内存
-ytm taskManager的内存
-yn tm的数量
-ys 每个tm的任务槽
-yqu yarn资源队列名称
-ynm yarn application name
wordcount代码
java
//可以设置延时时间
//taskmanager.network.netty.sendReceiveBufferSize 4M
//env.setBufferTimeout(-1);
/**
* @Description(描述): flink wordCount
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @ * * * * * * * * * * * * * @
*/
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class SocketWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> socket = env.socketTextStream("localhost", 6666);
//设置并行度
env.setParallelism(4);
//1.lambda写法
/* SingleOutputStreamOperator<String> flatMap = socket.flatMap((String value, Collector<String> out) -> {
Arrays.stream(value.split(" ")).forEach(word -> {
out.collect(word);
});
}).returns(Types.STRING);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(f -> Tuple2.of(f, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy(0).sum(1);
sum.print();*/
//2.function写法
/* SingleOutputStreamOperator<String> flatMap = socket.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] s = value.split(" ");
for (String ss : s) {
out.collect(ss);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy("f0").sum(1);
sum.print();*/
//3.function组合写法
/* SingleOutputStreamOperator<Tuple2<String,Integer>> flatMap = socket.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] s = value.split(" ");
for (String ss : s) {
out.collect(Tuple2.of(ss,1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(f -> f.f0).sum(1);
sum.print();*/
//4.richfunction组合写法 //rich包含了 open初始化 close关闭方法 getruntime方法
/* SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
private String name = null;
@Override
public void open(Configuration parameters) throws Exception {
name = "hainiu_";
}
@Override
public void close() throws Exception {
name = null;
}
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] s = value.split(" ");
for (String ss : s) {
System.out.println(getRuntimeContext().getIndexOfThisSubtask());
out.collect(Tuple2.of(name + ss, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1);
sum.print();*/
//5.processfunction组合写法
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
private String name = null;
@Override
public void open(Configuration parameters) throws Exception {
name = "hainiu_";
}
@Override
public void close() throws Exception {
name = null;
}
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
// getRuntimeContext()
String[] s = value.split(" ");
for (String ss : s) {
System.out.println(getRuntimeContext().getIndexOfThisSubtask());
out.collect(Tuple2.of(name + ss, 1));
}
}
}).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private Integer num = 0;
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
num += value.f1;
out.collect(Tuple2.of(value.f0, num));
}
});
//打印执行计划
System.out.println(env.getExecutionPlan());
sum.print();
env.execute();
}
}
kafka查看toiopic分区
`kafka-topic.sh--zookeeper你那.hadoop:2181--describe --topic topic_32 查看toiopic分区
一、DataSource
默认输入源
数据输入源
//1.文件
//相同格式的文件
env.readTextFile("file:///path")
//不同格式的文件
env.readFile(inputFormat, "file:///path");-->比如env.readFile(new FileInputFormat(""), "file:///path")
//2.Socket
自定义输入源
1.实现SourceFunction(非并行的)
package com.hainiuxy.flink;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* flink自定义数据源
*/
public class MyFileSource implements SourceFunction<String>, CheckpointedFunction {
/**
* SourceFunction是单并行度的
* ParallismSourceFunction
* 读取数据然后发送出去,发送给下游的operator
*
* @param ctx
* @throws Exception
* ctx.collect(count);最终发送数据
* <p>
* 比如我们要监控一个hdfs上面的文件是不是产生的了变化,如果变化了我们就打印出来所有的数据信息
* 监控的时间是2000ms,检验和check_sum
* LoL.exe --> lol.exe 20G -->md5 sadasdadadadqeeq2ew3223232adsasdas
*/
Boolean flag = true;
int interval = 2000;
String md5 = "";
// public MyFileSouce(String path){
// System.out.println(Thread.currentThread().getName()+"xxxx");
// this.path = new Path(path);
// }
private ListState<String> listState = null;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Path path = new Path(("/user/yeniu/data/country_data1"));
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(conf);
while (flag) {
if (!fs.exists(path)) {
Thread.sleep(interval);
continue;
}
System.out.println("md5====" + md5);
String originMD5 = fs.getFileChecksum(path).toString();
String current_md5 = originMD5.split(":")[1];
if (!current_md5.equals(md5)) {
FSDataInputStream in = fs.open(path);
BufferedReader bf = new BufferedReader(new InputStreamReader(in));
String line = null;
while ((line = bf.readLine()) != null)
ctx.collect(line);
bf.close();
in.close();
this.md5 = current_md5;
}
Thread.sleep(interval);
}
}
@Override
public void cancel() {
flag = false;
}
//将用户的数据放入到状态中
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.listState.clear();
this.listState.add(md5);
}
//初始化(1.初始化 2.恢复)
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> lds = new ListStateDescriptor<>("listStateDesc", BasicTypeInfo.STRING_TYPE_INFO);
this.listState = context.getOperatorStateStore().getListState(lds);
if (context.isRestored()) {
String stateMD5 = this.listState.get().iterator().next();
this.md5 = stateMD5;
}
}
}
/**
* 自定义数据源运行类
*/
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FileSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceFunction());
stringDataStreamSource.print();
env.execute();
}
}
2.实现ParallelSourceFunction与RichParallelSourceFunction(并行的)
kafka反序列化
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.util.Properties;
/**
* flink读取kafka数据的时候 反序列化的方式
* <p>
* kafka的反序列化的接口
* 1.DeserializationSchema (Byte[] message) !! (只能反序列化value)
* 实现类:SimpleStringScheme 转换为string字符串
*
* 2.KafkaDeserializationSchema(ConsumerRecord[k,v] record) !! (可以反序列化key)
* 实现类:JSONKeyValueDeserializationSchema 转换为json类型
* 实现类:KeyedDeserializationSchema 转换的数据不仅有value还有key
*/
public class FLinkKafkaDeser {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
// pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
pro.setProperty("flink.partition-discovery.interval-millis", "3000");
FlinkKafkaConsumer010<JSONObject> kafkaSc = new FlinkKafkaConsumer010<>("topic_32", new MyScheme4(true), pro);
//下边设置完了以后对于上面的设置就是覆盖
kafkaSc.setStartFromLatest();
//kafkaSc.setStartFromGroupOffsets();
env.addSource(kafkaSc).print();
env.execute();
}
2.1字段反序列化 (value)
/**
* 字段反序列化 (value)
*/
public static class MyScheme1 implements DeserializationSchema<String> {
@Override
public String deserialize(byte[] message) throws IOException {
String word = new String(message);
return word;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
//反序列化
@Override
public TypeInformation<String> getProducedType() {
// return TypeInformation.of(String.class);
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
2.2对象返序列化
public static class Hainiu {
private String word;
public Hainiu(String word) {
this.word = word;
}
public Hainiu() {
}
@Override
public String toString() {
return "word:" + word;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
/**
* 对象反序列化
*/
public static class MyScheme2 implements DeserializationSchema<Hainiu> {
@Override
public Hainiu deserialize(byte[] message) throws IOException {
String msg = new String(message);
return new Hainiu(msg);
}
@Override
public boolean isEndOfStream(Hainiu nextElement) {
return false;
}
@Override
public TypeInformation<Hainiu> getProducedType() {
return TypeInformation.of(Hainiu.class);
}
}
2.3key的反序列化
2.4json反序列化
public class FLinkKafkaDeser {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
// pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
pro.setProperty("flink.partition-discovery.interval-millis", "3000");
//json的反序列化
FlinkKafkaConsumer010<ObjectNode> kafkaSc = new FlinkKafkaConsumer010<>("topic_32", new JSONKeyValueDeserializationSchema(true), pro);
//下边设置完了以后对于上面的设置就是覆盖
kafkaSc.setStartFromLatest();
//kafkaSc.setStartFromGroupOffsets();
env.addSource(kafkaSc).print();
env.execute();
}
2.5自定义json反序列化
/**
*自定义json序列化
*/
public static class MyScheme4 implements KafkaDeserializationSchema<JSONObject> {
private Boolean includeMetadata;
public MyScheme4(Boolean includeMetadata) {
this.includeMetadata = includeMetadata;
}
@Override
public boolean isEndOfStream(JSONObject nextElement) {
return false;
}
@Override
public JSONObject deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
byte[] values = record.value();
byte[] keys = record.key();
String topic = record.topic();
int partition = record.partition();
Long offset = record.offset();
JSONObject obj = new JSONObject();
if (keys != null) {
obj.put("key", new String(keys));
}
if (values != null) {
obj.put("value", new String(values));
}
if (includeMetadata) {
obj.put("partition", partition);
obj.put("offset", offset);
obj.put("topic", topic);
}
return obj;
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return TypeInformation.of(JSONObject.class);
}
}
kafka动态发现分区配置
properties.setProperty("flink.partition-discovery.interval-millis", "30000")
二、transformations
转换算子
connect 与 union (合并流)
//connect连接的两个流类型可以不一致,而union连接的流的类型必须一致
//connect要用coProcessFunction union用processfunction
//1.ds2.connect(ds1).process(new CoProcessFunction<Tuple2<String, String>, String, String>()
//2.
1.union
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* flink 两个流uniuon
*/
public class FlinkUnion {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
// SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
// @Override
// public Tuple2<String, String> map(String value) throws Exception {
// String[] split = value.split("\t");
// return Tuple2.of(split[0], split[1]);
// }
// });
//----------------------------------------------------------------
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
DataStreamSource<String> ds1 = env.addSource(kafkaSource);
//-----------------------------------------------------------
ds.union(ds1).process(new ProcessFunction<String, String>() {
Map<String, String> map = new HashMap<>();
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
String[] split = value.split("\t");
if (split.length > 1) {
map.put(split[0], split[1]);
out.collect(value);
} else {
String s = map.get(value);
if (s == null) {
out.collect("unknow");
} else
out.collect(s);
}
}
}).print();
env.execute();
}
}
2.connect
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* flink两个流connect
*/
public class FlinkConnect {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
});
//----------------------------------------------------------------
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
DataStreamSource<String> ds1 = env.addSource(kafkaSource);
//-----------------------------------------------------------
ds2.connect(ds1).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
Map<String, String> map = new HashMap<>();
//两个流的处理逻辑
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(value.f0, value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
String s = map.get(value);
out.collect(s == null ? "no match" : s);
}
}).print();
env.execute();
}
}
3.keyby
//1.需要两个流先keyby 然后再拿两个keyby进行分组
//一般不用 会数据倾斜
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* flink connect按照 keyby分组,
*/
public class FlinkConnectKeyBy {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
});
KeyedStream<Tuple2<String, String>, String> ds2keyBy = ds2.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
//----------------------------------------------------------------
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
DataStreamSource<String> ds1 = env.addSource(kafkaSource);
//-----------------------------------------------------------
KeyedStream<String, String> ds1KeyBy = ds1.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
ds2keyBy.connect(ds1KeyBy).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
Map<String, String> map = new HashMap<>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(value.f0, value.f1);
out.collect(getRuntimeContext().getIndexOfThisSubtask() + ":" + value.toString());
}
@Override
public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
String s = map.get(value);
out.collect(s == null ? "no match" : s);
}
}).print();
env.execute();
}
}
3.1keyby按照实体类分组
package com.hainiuxy.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
/**
* flink自定义分组
* 1.实体类必须重写hashcode
* 2.两个流keyby必须都按照 实体类分组
*/
public class FlinkConnectKeyByVO {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
});
KeyedStream<Tuple2<String, String>, HainiuVo> ds2Keyby = ds2.keyBy(new KeySelector<Tuple2<String, String>, HainiuVo>() {
@Override
public HainiuVo getKey(Tuple2<String, String> value) throws Exception {
return new HainiuVo(value.f0);
}
});
//----------------------------------------------------------------
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG,"g_32");
FlinkKafkaConsumer010<HainiuVo> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new MyDeserialization(), pro);
DataStreamSource<HainiuVo> ds1 = env.addSource(kafkaSource);
//-----------------------------------------------------------
//如果使用Bean分组当成key那么必须加上一个单独的hashcode方法
KeyedStream<HainiuVo, HainiuVo> ds1KeyBy = ds1.keyBy(new KeySelector<HainiuVo, HainiuVo>() {
@Override
public HainiuVo getKey(HainiuVo value) throws Exception {
return value;
}
});
ds2Keyby.connect(ds1KeyBy).process(new CoProcessFunction<Tuple2<String,String>, HainiuVo, String>() {
Map<String,String> map = new HashMap<>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(value.f0,value.f1);
out.collect(getRuntimeContext().getIndexOfThisSubtask()+":"+value.toString());
}
@Override
public void processElement2(HainiuVo value, Context ctx, Collector<String> out) throws Exception {
String s = map.get(value.getName());
out.collect(s==null?"no match":s);
}
}).print();
env.execute();
}
public static class MyDeserialization implements DeserializationSchema<HainiuVo> {
@Override
public HainiuVo deserialize(byte[] message) throws IOException {
return new HainiuVo(new String(message));
}
@Override
public boolean isEndOfStream(HainiuVo nextElement) {
return false;
}
@Override
public TypeInformation<HainiuVo> getProducedType() {
return TypeInformation.of(HainiuVo.class);
}
}
public static class HainiuVo{
private String name;
@Override
public int hashCode() {
return Objects.hash(getName());
}
public HainiuVo() {
}
public HainiuVo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
3.2KeyedCoProcessFunction
//KeyedCoProcessFunction 能获取key的值, CoProcessFunction更关注流信息
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
/**
* flink KeyedCoProcessFunction
* ctx.getCurrentKey().name 获取当前key的名字
*/
public class FlinkConnectKeyByVOAndKeyedProcess {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
});
KeyedStream<Tuple2<String, String>, HainiuVo> ds2Keyby = ds2.keyBy(new KeySelector<Tuple2<String, String>, HainiuVo>() {
@Override
public HainiuVo getKey(Tuple2<String, String> value) throws Exception {
return new HainiuVo(value.f0);
}
});
//----------------------------------------------------------------
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG,"g_32");
FlinkKafkaConsumer010<HainiuVo> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new MyDeserialization(), pro);
DataStreamSource<HainiuVo> ds1 = env.addSource(kafkaSource);
//-----------------------------------------------------------
//如果使用Bean分组当成key那么必须加上一个单独的hashcode方法
KeyedStream<HainiuVo, HainiuVo> ds1KeyBy = ds1.keyBy(new KeySelector<HainiuVo, HainiuVo>() {
@Override
public HainiuVo getKey(HainiuVo value) throws Exception {
return value;
}
});
ds2Keyby.connect(ds1KeyBy).process(new KeyedCoProcessFunction<HainiuVo, Tuple2<String,String>, HainiuVo, String>() {
Map<String,String> map = new HashMap<>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(ctx.getCurrentKey().name,value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(HainiuVo value, Context ctx, Collector<String> out) throws Exception {
String s = map.get(value.name);
out.collect(s==null?"unknow":s);
}
}).print();
env.execute();
}
public static class MyDeserialization implements DeserializationSchema<HainiuVo> {
@Override
public HainiuVo deserialize(byte[] message) throws IOException {
return new HainiuVo(new String(message));
}
@Override
public boolean isEndOfStream(HainiuVo nextElement) {
return false;
}
@Override
public TypeInformation<HainiuVo> getProducedType() {
return TypeInformation.of(HainiuVo.class);
}
}
public static class HainiuVo{
private String name;
@Override
public int hashCode() {
return Objects.hash(getName());
}
public HainiuVo() {
}
public HainiuVo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
Redistributing streams 解决数据倾斜的方法:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* flink 解决数据倾斜问题
* 1.ds.rescale(); 通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
* 数据传输都在一个TaskManager内,不需要通过网络。
* 2. ds.rebalance(); 对全局进行重分区
*/
public class FlinkRedistribute {
public static void main(String[] args) {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
DataStreamSource<String> ds = env.addSource(new MyFileSource()).setParallelism(3);
// DataStream<String> ds1 = ds.rebalance();
DataStream<String> ds1 = ds.rescale();
SingleOutputStreamOperator<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value);
}
}).setParallelism(5);
// keyby 同样的key的数据跑到一起 HASH
//customer_partitioning自定义分区其 HASH
}
}
1.自定义partitioner
//使用ds,partitionCustom 对每个流进行处理 重分区
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.*;
/**
* flink解决数据倾斜
* 1.自定义partitioner
* 自定义分区器实现分组
*/
public class FlinkUDPartitionerTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
});
//对每个流进行partitionCustom
DataStream<Tuple2<String, String>> ds2Partitioned = ds2.partitionCustom(new MyPartitioner(), new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
}).keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
//----------------------------------------------------------------
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
DataStreamSource<String> ds1 = env.addSource(kafkaSource);
//-----------------------------------------------------------
//对每个流进行partitionCustom
DataStream<String> ds1Partitioned = ds1.partitionCustom(new MyPartitioner(), new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
//两个流join
ds2Partitioned.connect(ds1Partitioned).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
Map<String, String> map = new HashMap<>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(value.f0, value.f1);
System.out.println(getRuntimeContext().getIndexOfThisSubtask() + ":" + value.toString());
out.collect(value.toString());
}
@Override
public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
String s = map.get(value);
out.collect(s == null ? "no match" : s);
}
}).print();
env.execute();
}
//自定义partition
public static class MyPartitioner implements Partitioner<String> {
List<String> list = Arrays.asList(new String[]{"AF", "AE", "AG", "AI", "AO"});
@Override
public int partition(String key, int numPartitions) {
if (list.contains(key)) {
return 0;
} else {
return 1;
}
}
}
}
2.join数据倾斜问题加爵
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
/**
* Hdfs出现的数据
* kafka中出现的数据
* join
* join后数据倾斜问题解决
*/
public class FlinkRebalancePartitioner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
DataStreamSource<String> ds = env.addSource(new MyFileSource());
Properties pro = new Properties();
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
DataStreamSource<String> ds1 = env.addSource(kafkaSource);
SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
});
SingleOutputStreamOperator<Tuple2<String, String>> ds3 = ds2.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
@Override
public void flatMap(Tuple2<String, String> value, Collector<Tuple2<String, String>> out) throws Exception {
for (int i = 0; i < 24; i++) {
out.collect(Tuple2.of(i + "_" + value.f0, value.f1));
}
}
});
DataStream<Tuple2<String, String>> ds4 = ds3.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
String s = key.split("_")[0];
return Integer.valueOf(s);
}
}, new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<String> ds11 = ds1.map(new RichMapFunction<String, String>() {
Random random = null;
@Override
public void open(Configuration parameters) throws Exception {
random = new Random();
}
@Override
public String map(String value) throws Exception {
int index = random.nextInt(24);
return index + "_" + value;
}
});
DataStream<String> ds111 = ds11.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
String[] split = key.split("_");
return Integer.valueOf(split[0]);
}
}, new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
//mapState ListState valueState reducing fold aggregate
//ListState
//BroadcastState
ds4.connect(ds111).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println("data_assigner:" + index + "<--->" + value);
map.put(value.f0, value.f1);
}
@Override
public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
String s = map.get(value);
out.collect(s == null ? "no match" : s);
}
}).print();
env.execute();
}
}
reduce与fold聚合操作
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* flink聚合操作
* reduce聚合
* folod聚合(过时)
* process(用不成) 按照线程来的,不会按照key区分
*/
public class FlinkAggregateTest {
public static void main(String[] args) throws Exception {
List<String> list = new ArrayList<String>();
list.add("hainiu1");
list.add("hainiu1");
list.add("hainiu2");
list.add("hainiu2");
list.add("hainiu3");
list.add("hainiu3");
list.add("hainiu4");
list.add("hainiu4");
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
DataStreamSource<String> ds = env.fromCollection(list);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
KeyedStream<Tuple2<String, Integer>, String> ds3 = ds2.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//输入的两个参数类型和返回值类型必须一直
ds3.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})/*.print("ReduceFunction")*/;
ds3.fold(0, new FoldFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception {
return accumulator + value.f1;
}
})/*.print("FoldFunction")*/;
/*ds3.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Integer>() {
int sum = 0;
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Integer> out) throws Exception {
String currentKey = ctx.getCurrentKey();
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(currentKey + ":" + index);
sum += value.f1;
out.collect(sum);
}
}).print("KeyedProcessFunction");*/
env.execute();
}
}
min,minBy(聚合)
//尽量使用minby/maxby
//min 会按照首字母输出,minby按照整个单词进行输出
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 聚合操作
* min minby max maxby
* 尽量使用minby maxby
*/
public class FlinkAggregateMaxMinSum {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("localhost", 6666);
KeyedStream<Tuple2<String, Integer>, String> ds1 = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] split = value.split(" ");
return Tuple2.of(split[0], Integer.valueOf(split[1]));
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0.substring(0, 1);
}
});
//按照key的第一个字母分组
// ds1.maxBy(1).print("maxBy");
ds1.min(1).print("min");
ds1.minBy(1).print("minBY");
env.execute();
}
}
OutputTag(侧输出流)
//只能在processFunction中使用
//根据条件输出不同类型的数据
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;
/**
* 侧输出流 拆分流
* 根据条件输出不同类型的数据
*/
public class FlinkOutPutTagTest {
private static final OutputTag<String> tag = new OutputTag<>("invalid", BasicTypeInfo.STRING_TYPE_INFO);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("localhost", 6666);
//想要使用分流操作必须使用最底层的process方法
SingleOutputStreamOperator<String> ds1 = ds.process(new ProcessFunction<String, String>() {
List<String> list = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
list.add("138");
list.add("136");
list.add("152");
list.add("139");
list.add("110");
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
int length = value.length();
boolean valid = list.contains(value.substring(0, 3));
if (length == 11 && valid) {
out.collect(value);
} else {
ctx.output(tag, value);
}
}
});
ds1.print("valid");
ds1.getSideOutput(tag).print("invalid");
env.execute();
}
}
sink
1.socket text csv
package sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* sink输出的三种形式
* socket
* text
* csv
*/
public class FlinkTextSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dss = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds = dss.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//输出到csv
ds.writeAsCsv("csv");
//输出到 text
ds.writeAsText("text");
//输出到socket
ds.writeToSocket("localhost",8888,new SerializationSchema<Tuple2<String,Integer>>(){
@Override
public byte[] serialize(Tuple2<String, Integer> element) {
String s = element.f0+":"+element.f1;
return s.getBytes();
}
});
ds.print();
//打印错误 (生产不能使用)
ds.printToErr();
env.execute();
}
}
2.自定义数据输出
//实现SinkFunction 或 继承RichSinkFunction
//(在没有自行改变并行度的情况下,是否并行取决其父operator)
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* flink 自定义sinkfunction输出
*/
public class FlinkSinkFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dss = env.socketTextStream("localhost", 6666);
dss.addSink(new MyHdfsSink("hdfs://ns1/user/qingniu/flink_hdfs_sink"));
env.execute();
}
/**
* 目标是插入文件到hdfs中,如果没有就创建一个新的,如果存在就拼接
*/
public static class MyHdfsSink extends RichSinkFunction<String> {
/**
* 如果使用构造器那么必须要注意,他是在driver端执行的,不要使用非序列化的对象
*
* @param parameters
* @throws Exception
*/
private String hdfsPath;
private Path hPath;
private FileSystem fs = null;
private SimpleDateFormat df;
public MyHdfsSink(String hdfsPath) {
this.hdfsPath = hdfsPath;
}
//建立hdfs连接
@Override
public void open(Configuration parameters) throws Exception {
df = new SimpleDateFormat("yyyyMMddHHmm");
hPath = new Path(this.hdfsPath);
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
//设置hdfs用户权限
fs = FileSystem.get(new URI("hdfs://ns1"), conf, "qingniu");
}
@Override
public void close() throws Exception {
fs.close();
}
/**
* 首先传递进来一个根目录
* 在根目录下面创建的文件是按照日期和线程的index为标识的
* 执行方法
* @param value
* @param context
*/
@Override
public void invoke(String value, Context context) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
String dateStr = df.format(new Date());
String allPath = this.hdfsPath + "/" + dateStr + "_" + indexOfThisSubtask;
Path realPath = new Path(allPath);
FSDataOutputStream fsos = null;
if (fs.exists(realPath)) {
fsos = this.fs.append(realPath);
} else {
fsos = fs.create(realPath);
}
fsos.writeUTF(value);
fsos.flush();
fsos.close();
}
}
}
3.FlinkKafkaProducer(输出到kafka)
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* flink数据输出到kafka
* 4种方式
*/
public class FlinkKafkaSink01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextSt