FlinkSql之TableAPI详解
核心概念
Flink 的 Table API 和 SQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.
动态表和连续查询
动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
TableAPI
首先需要导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
/**
* 使用TableAPI的基本套路:
* 1.创建表的执行环境
* 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
* 3.对动态表进行查询
* 4.把动态表转换为流
*/
1.TableAPI 中将动态表转换为流时有两种方法
DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);
toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);
toRetractStream则可以使用
2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class
代码实现:
package net.cyan.FlinkSql;
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* 使用TableAPI的基本套路:
* 1.创建表的执行环境
* 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
* 3.对动态表进行查询
* 4.把动态表转换为流
*/
public class Demo1 {
public static void main(String[] args) {
Configuration configuration=new Configuration();
configuration.setInteger("rest.port",3333);
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
//模拟数据
DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements(
new WaterSensor("S1", 1000L, 10),
new WaterSensor("S1", 1000L, 10),
new WaterSensor("S2", 2000L, 20),
new WaterSensor("S3", 3000L, 30),
new WaterSensor("S4", 4000L, 40),
new WaterSensor("S5", 5000L, 50)
);
//创建表的执行环境
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
Table table = tableEnvironment.fromDataStream(WaterSensorSource);
//对表进行查询
//1、过时的查询书写
Table result = table
.where("id='S1'")
.select("*");
//2、不过时的书写
Table result1 = table