FLINK
1.tableAPI
package com.practice.apitest.tableapi;
import com.practice.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @ClassName: TableTest1_Example
* @Description:
* @Version: 1.0
*/
public class TableTest1_Example {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 读取数据
DataStreamSource<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
// 2. 转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3. 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4. 基于流创建一张表
Table dataTable = tableEnv.fromDataStream(dataStream);
// 5. 调用table API进行转换操作
Table resultTable = dataTable.select("id, temperature")
.where("id = 'sensor_1'");
// 6. 执行SQL
tableEnv.createTemporaryView("sensor", dataTable);
String sql = "select id, temperature from sensor where id = 'sensor_1'";
Table resultSqlTable = tableEnv.sqlQuery(sql);
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
}
2.Blink解析器
package com.practice.apitest.tableapi;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
/**
* @ClassName: TableTest2_CommonApi
* @Description:
* @Version: 1.0
*/
public class TableTest2_CommonApi {
public static void main(String[] args) throws Exception{
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.1 基于老版本planner的流处理
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);
// 1.2 基于老版本planner的批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);
// 1.3 基于Blink的流处理
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
// 1.4 基于Blink的批处理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);
// 2. 表的创建:连接外部系统,读取数据
// 2.1 读取文件
String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
tableEnv.connect( new FileSystem().path(filePath))
.withFormat( new Csv())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
// inputTable.printSchema();
// tableEnv.toAppendStream(inputTable, Row.class).print();
// 3. 查询转换
// 3.1 Table API
// 简单转换
Table resultTable = inputTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 3.2 SQL
tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
// 打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toRetractStream(aggTable, Row.class).print("agg");
tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");
env.execute();
}
}
3.输出到外部文件系统
package com.practice.apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
/**
* @ClassName: TableTest3_FileOutput
* @Description:
* @Version: 1.0
*/
public class TableTest3_FileOutput {
public static void main(String[] args) throws Exception {
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 表的创建:连接外部系统,读取数据
// 读取文件
String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
// inputTable.printSchema();
// tableEnv.toAppendStream(inputTable, Row.class).print();
// 3. 查询转换
// 3.1 Table API
// 简单转换
Table resultTable = inputTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 3.2 SQL
tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
// 4. 输出到文件
// 连接外部文件注册输出表
String outputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\out.txt";
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
// .field("cnt", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
//往文件里面写入数据不支持追加写入并更新的方式
resultTable.insertInto("outputTable");
// aggTable.insertInto("outputTable");
env.execute();
}
}
4.写入到kafka
package com.practice.apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
/**
* @ClassName: TableTest4_KafkaPipeLine
* @Description:
* @Version: 1.0
*/
public class TableTest4_KafkaPipeLine {
public static void main(String[] args) throws Exception {
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 连接Kafka,读取数据
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
// 3. 查询转换
// 简单转换
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = sensorTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = sensorTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 4. 建立kafka连接,输出到不同的topic下
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sinktest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
// .field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
//不支持更新操作
resultTable.insertInto("outputTable");
env.execute();
}
}
5.tableapi的更新模式
//追加(Append)模式
– 表只做插入操作,和外部连接器只交换插入(Insert)消息
// 撤回(Retract)模式
– 表和外部连接器交换添加(Add)和撤回(Retract)消息
– 插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update)
编码为上一条的 Retract 和下一条的 Add 消息
//更新插入(Upsert)模式
//更新插入必须制定key,且只生成一条记录
– 更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息
6.输出到 Kafka
tableEnv.connect(
new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092") )
.withFormat( new Csv() )
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable");
resultTable.insertInto("kafkaOutputTable");
7.输出到es
tableEnv.connect(
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp") )
.inUpsertMode()
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");
8.输出到mysql
tableEnv.connect(
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp") )
.inUpsertMode()
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");
9.Table 与 DataStream转换
9.1将 Table 转换成 DataStream
//追加模式(Append Mode) – 用于表只会被插入(Insert)操作更改的场景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
//撤回模式(Retract Mode) – 用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。
-得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是
-新增的数据(Insert),还是被删除的数据(Delete)
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv
.toRetractStream(aggResultTable , Row.class);
9.2将 DataStream 转换成表
//对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API做转换操作
DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream);
//默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来
DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");
9.3创建临时视图(Temporary View)
//基于 DataStream 创建临时视图
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView",
dataStream, "id, temperature, timestamp as ts");
//基于 Table 创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);
9.4查看执行计划
Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
//查看执行计划,可以通过 TableEnvironment.explain(table) 方法或
TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划
➢ 优化的逻辑查询计划
➢ 优化后的逻辑查询计划
➢ 实际执行计划。
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);
10.动态表(Dynamic Tables)
package com.practice.apitest.beans;
/**
* @ClassName: SensorReading
* @Description:
* @Version: 1.0
*/
// 传感器温度读数的数据类型
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
package com.practice.apitest.tableapi;
import com.practice.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @ClassName: TableTest5_TimeAndWindow
* @Description:
* @Version: 1.0
*/
public class TableTest5_TimeAndWindow {
public static void main(String[] args) throws Exception {
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 读入文件数据,得到DataStream
DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
// 3. 转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
// 4. 将流转换成表,定义时间特性
// pt:随便写,不要和关键字冲突就行,
// proctime: 关键字,自动生成毫秒值
//Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, pt.proctime");
Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");
tableEnv.createTemporaryView("sensor", dataTable);
// 5. 窗口操作
// 5.1 Group Window
// table API
Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
.groupBy("id, tw")
.select("id, id.count, temp.avg, tw.end");
// SQL
Table resultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) " +
"from sensor group by id, tumble(rt, interval '10' second)");
// 5.2 Over Window
// table API
Table overResult = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
.select("id, rt, id.count over ow, temp.avg over ow");
// SQL
Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
" from sensor " +
" window ow as (partition by id order by rt rows between 2 preceding and current row)");
// dataTable.printSchema();
// tableEnv.toAppendStream(resultTable, Row.class).print("result");
// tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
tableEnv.toAppendStream(overResult, Row.class).print("result");
tableEnv.toRetractStream(overSqlResult, Row.class).print("sql");
env.execute();
}
}
10.1定义处理时间Processing Time
//定义处理时间(Processing Time)
//TODO 这种写法目前还不支持,会报错
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
.field("pt", DataTypes.TIMESTAMP(3))
.proctime()
)
11.定义处理时间(Processing Time)
定义事件时间,同样有三种方法:
➢ 由 DataStream 转换成表时指定
➢ 定义 Table Schema 时指定
➢ 在创建表的 DDL 中定义
11.1定义事件时间(Event Time)
11.1.1 由 DataStream 转换成表时指定
在 DataStream 转换成 Table,使用 .rowtime 可以定义事件时间属性
// 将 DataStream转换为 Table,并指定时间字段
Table sensorTable = tableEnv.fromDataStream(dataStream,"id,timestamp.rowtime,temperature");
// 或者,直接追加时间字段
Table sensorTable =
tableEnv.fromDataStream(dataStream, " id, temperature, timestamp, rt.rowtime");
11.1.2定义 Table Schema 时指定
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.rowtime(
new Rowtime()
.timestampsFromField("timestamp") // 从字段中提取时间戳
.watermarksPeriodicBounded(1000) // watermark延迟1秒 )
.field("temperature", DataTypes.DOUBLE())
)
11.1.3在创建表的 DDL 中定义
//interval '1' second sql中一秒的写法
String sinkDDL=
"create table dataTable (" + " id varchar(20) not null, "
+ " ts bigint, "
+ " temperature double, "
+ " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), "
+ " watermark for rt as rt - interval '1' second" + ") with ("
+ " 'connector.type' = 'filesystem', "
+ " 'connector.path' = '/sensor.txt', "
+ " 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);
12.窗口(两种)
//Group Windows(分组窗口)
– 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
// Over Windows
– 针对每个输入行,计算相邻行范围内的聚合
12.1Group Windows
//1.Group Windows 是使用 window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。
//2.为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用
Table table = input
.window([w: GroupWindow] as "w") // 定义窗口,别名为 w
.groupBy("w, a") // 按照字段 a和窗口 w分组
.select("a, b.sum"); // 聚合
//3.Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作
12.1.1滚动窗口(Tumbling windows)
//滚动窗口要用 Tumble 类来定义
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").on("proctime").as("w"))
// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w"))
12.1.2滑动窗口(Sliding windows)
//滑动窗口要用 Slide 类来定义
// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
// Sliding Processing-time window
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
// Sliding Row-count window 滚动计数窗口
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
12.1.3会话窗口(Session windows)
//会话窗口要用 Session 类来定义
// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))
// Session Processing-time Window
.window(Session.withGap("10.minutes").on(“proctime").as("w"))
12.1.4SQL 中的 Group Windows
Group Windows 定义在 SQL 查询的 Group By 子句中
//1. 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
//"select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second)from sensor group by id, tumble(rt, interval '10' second)"
➢ TUMBLE(time_attr, interval)
//2. 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度
➢ HOP(time_attr, interval, interval)
//3. 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔
➢ SESSION(time_attr, interval)
12.1.5窗口实现
//toAppendStream 追加流,仅支持insert操作
//toRetractStream 撤回流
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @ClassName: TableTest5_TimeAndWindow
* @Description:
* @Version: 1.0
*/
public class TableTest5_TimeAndWindow {
public static void main(String[] args) throws Exception {
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 读入文件数据,得到DataStream
DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
// 3. 转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
// 4. 将流转换成表,定义时间特性
// pt:随便写,不要和关键字冲突就行,
// proctime: 关键字,自动生成毫秒值
//Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, pt.proctime");
Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");
tableEnv.createTemporaryView("sensor", dataTable);
// 5. 窗口操作
// 5.1 Group Window
// table API
Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
.groupBy("id, tw")
.select("id, id.count, temp.avg, tw.end");
// SQL
Table resultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) " +
"from sensor group by id, tumble(rt, interval '10' second)");
// 5.2 Over Window
// table API
Table overResult = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
.select("id, rt, id.count over ow, temp.avg over ow");
// SQL
Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
" from sensor " +
" window ow as (partition by id order by rt rows between 2 preceding and current row)");
// dataTable.printSchema();
// tableEnv.toAppendStream(resultTable, Row.class).print("result");
// tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
tableEnv.toAppendStream(overResult, Row.class).print("result");
tableEnv.toRetractStream(overSqlResult, Row.class).print("sql");
env.execute();
}
}
12.2Over Windows
12.2.1概念
Over window 聚合是标准 SQL 中已有的(over 子句),可以在查询的
SELECT 子句中定义
• Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合
• Over windows 使用 window(w:overwindows*)子句定义,并在 select
()方法中通过别名来引用
Table table = input
.window([w: OverWindow] as "w")
.select("a, b.sum over w, c.min over w");
• Table API 提供了 Over 类,来配置 Over 窗口的属性
12.2.2无界 Over Windows
可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定
义 Over windows
• 无界的 over window 是使用常量指定的
// 无界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))
//无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
//无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))
12.2.3有界 Over Windows
有界的 over window 是用间隔的大小指定的
// 有界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// 有界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("procime").preceding("10.rows").as("w"))
12.2.4SQL 中的 Over Windows
用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
• 目前仅支持在当前行范围之前的窗口
• ORDER BY 必须在单一的时间属性上指定
SELECT COUNT(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
函数(Functions)
1.函数
2.函数
用户自定义函数(UDF)
用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们
显著地扩展了查询的表达能力
• 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
• 函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当
用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,
这样Table API 或 SQL 解析器就可以识别并正确地解释它
标量函数(Scalar Functions)
用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值 • 为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类
Scalar Function,并实现(一个或多个)求值(eval)方法
• 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval
public static class HashCode extends ScalarFunction {
private int factor = 13;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
表函数(Table Functions)
用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同
的是,它可以返回任意数量的行作为输出,而不是单个值
• 为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类
TableFunction 并实现(一个或多个)求值方法
• 表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval
public static class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = ",";
public Split(String separator) {
this.separator = separator;
}
public void eval(String str) {
for (String s : str.split(separator)) {
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
聚合函数(Aggregate Functions)
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值
• 用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的
AggregationFunction要求必须实现的方法:
– createAccumulator()
– accumulate()
– getValue()
• AggregateFunction 的工作原理如下:
– 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;
可以通过调用 createAccumulator() 方法创建空累加器
– 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
– 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果
表聚合函数(Table Aggregate Functions)
• 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚 合为具有多行和多列的结果表
• 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的
AggregationFunction 要求必须实现的方法:
– createAccumulator()
– accumulate()
– emitValue()
• TableAggregateFunction 的工作原理如下:
– 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据
结构。通过调用 createAccumulator() 方法可以创建空累加器。
– 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
– 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。