目录
10.1 Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数
10.2.3 聚合函数(Aggregate Functions)
10.2.4 表聚合函数(Table Aggregate Functions)
Table API 和 Flink SQL
1 Table API 和 Flink SQL 是什么
Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者 批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将 查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义 的,具有 IDE 支持如:自动完成和语法检测。
Flink 对批处理和流处理,提供了统一的上层 API
Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直 观的方式组合来自一些关系运算符的查询
Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite

写一个小示例来入门TableAPI和Flink SQL
首先要引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>代码示例:
package com.dongda.tableapi;
import com.dongda.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,将全局并行度设置为1
//从文件里面读取数据
DataStream<String> inputStream = env.readTextFile("/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt");
//转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//3.创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//4.基于数据流创建一张表
Table dataTable = tableEnv.fromDataStream(dataStream);
//5.调用table API进行转换操作
Table resultTable = dataTable.select("id,temperature")
.where("id='sensor_1'");
//6.执行SQL,我们必须专门注册一张表
tableEnv.createTemporaryView("sensor",dataTable);
String sql="select id,temperature from sensor where id='sensor_1'";
Table resultSqlTable = tableEnv.sqlQuery(sql);
//我们可以看到tableAPI和直接写SQL是等价的
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();
}
}
2 基本程序结构

3 创建 TableEnvironment
创建表的执行环境,需要将 flink 流处理的执行环境传入
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有对 表的操作都基于 TableEnvironment
– 注册 Catalog
– 在 Catalog 中注册表
– 执行 SQL 查询
– 注册用户自定义函数(UDF)
配置 TableEnvironment
package com.dongda.tableapi;
import com.dongda.beans.SensorReading;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,将全局并行度设置为1
//1.1 基于老版本planner的流处理
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);
//1.2 基于老版本planner的批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);
//1.3 基于Blink的流处理
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
//1.4 基于Blink的批处理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment blinkBatchtableEnv = TableEnvironment.create(blinkBatchSettings);
}
}
4 表(Table)
TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表
表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成: Catalog名、数据库(database)名和对象名
表可以是常规的,也可以是虚拟的(视图,View)
常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来
视图(View)可以从现有的表中创建,通常是 table API 或者 SQL 查询的 一个结果集
4.1 创建表
TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表

package com.dongda.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.表的创建:连接外部系统,读取数据
//2.1 读取文件
String filePath="/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt";
//读取文件 组册表
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
//3.查询转换
//3.1 Table API
//简单转换
Table resultTable = inputTable.select("id,temperature")
.filter("id ==='sensor_6'");
//聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id ,id.count as count, temperature.avg as avgTemp");
//3.2 SQL
tableEnv.sqlQuery("select id, temperature from inputTable where id = 'sensor_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temperature) as avgTemp from inputTable group by id");
//打印输出
tableEnv.toAppendStream(resultTable,Row.class).print("result");
tableEnv.toRetractStream(aggTable,Row.class).print("agg");
tableEnv.toRetractStream(sqlAggTable,Row.class).print("sqlagg");
env.execute();
}
}
4.2 输出到文件
package com.dongda.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.表的创建:连接外部系统,读取数据
//2.1 读取文件
String filePath="/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt";
//读取文件 组册表
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
//3.查询转换
//3.1 Table API
//简单转换
Table resultTable = inputTable.select("id,temperature")
.filter("id ==='sensor_6'");
//聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id ,id.count as count, temperature.avg as avgTemp");
//3.2 SQL
tableEnv.sqlQuery("select id, temperature from inputTable where id = 'sensor_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temperature) as avgTemp from inputTable group by id");
//4.输出到文件
//连接外部文件注册输出表
String outputPath="/Users/haitaoyou/developer/flink/src/main/resources/out.txt";
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
env.execute();
}
}
4.3 更新模式
• 对于流式查询,需要声明如何在表和外部连接器之间执行转换
• 与外部系统交换的消息类型,由更新模式(Update Mode)指定
➢ 追加(Append)模式
– 表只做插入操作,和外部连接器只交换插入(Insert)消息
➢ 撤回(Retract)模式
– 表和外部连接器交换添加(Add)和撤回(Retract)消息
– 插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update) 编码为上一条的 Retract 和下一条的 Add 消息
➢ 更新插入(Upsert)模式
– 更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息
4.4 输出到 Kafka
可以创建 Table 来描述 kafka 中的数据,作为输入或输出的 TableSink
package com.dongda.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
public class Example {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.连接kafka,读取数据
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect","localhost:2181")
.property("bootstrap.servers","localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temp",DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
//3.简单转换
Table sensorTable = tableEnv.from("inputTable");
//3.1 Table API
//简单转换
Table resultTable = sensorTable.select("id,temperature")
.filter("id ==='sensor_6'");
//聚合统计
Table aggTable = sensorTable.groupBy("id")
.select("id ,id.count as count, temperature.avg as avgTemp");
//4.kafka进kafka出,建立kafka连接,输出到不同的topic下
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect","localhost:2181")
.property("bootstrap.servers","localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
// .field("timestamp",DataTypes.BIGINT())
.field("temp",DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
env.execute();
}
}
4.5 输出到 ES
可以创建 Table 来描述 ES 中的数据,作为输出的 TableSink

4.6 输出到 MySql
可以创建 Table 来描述 MySql 中的数据,作为输入和输出

5 将 Table 转换成 DataStream
表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就 可以继续在 Table API 或 SQL 查询的结果上运行了
将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将 表的每一行转换成的数据类型
表作为流式查询的结果,是动态更新的
转换有两种转换模式:追加(Append)模式和撤回(Retract)模式
5.1 将 Table 转换成 DataStream
➢ 追加模式(Append Mode)
– 用于表只会被插入(Insert)操作更改的场景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
➢ 撤回模式(Retract Mode)
– 用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。
– 得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是 新增的数据(Insert),还是被删除的数据(Delete)
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv .toRetractStream(aggResultTable , Row.class);
5.2 将 DataStream 转换成表
• 对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API 做转换操作
DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream);
• 默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来
DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");
5.3 创建临时视图(Temporary View)
• 基于 DataStream 创建临时视图
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView",
dataStream, "id, temperature, timestamp as ts");
• 基于 Table 创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);
6 查看执行计划
Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
查看执行计划,可以通过 TableEnvironment.explain(table) 方法或 TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划
➢ 优化的逻辑查询计划
➢ 优化后的逻辑查询计划
➢ 实际执行计划
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);
7 流处理和关系代数的区别

7.1 动态表(Dynamic Tables)
• 动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念
• 与表示批处理数据的静态表不同,动态表是随时间变化的
➢ 持续查询(Continuous Query)
动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
连续查询永远不会终止,并会生成另一个动态表
查询会不断更新其动态结果表,以反映其动态输入表上的更改
7.2 动态表和持续查询

➢ 流式表查询的处理过程:
1. 流被转换为动态表
2. 对动态表计算连续查询,生成新的动态表
3. 生成的动态表被转换回流
7.3
7.3.1 将流转换成动态表
• 为了处理带有关系查询的流,必须先将其转换为表
• 从概念上讲,流的每个数据记录,都被解释为对结果表的插入 (Insert)修改操作
7.3.2 持续查询
• 持续查询会在动态表上做计算处理,并作为结果生成新的动态表

7.3.3 将动态表转换成 DataStream
与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改
将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码
➢ 仅追加(Append-only)流
– 仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流
➢ 撤回(Retract)流
– 撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息
➢ Upsert(更新插入)流
– Upsert 流也包含两种类型的消息:Upsert 消息和删除(Delete)消息。

8 时间特性(Time Attributes)
基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时 间语义和时间数据来源的信息
Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳
时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以 作为一个字段引用,并且可以在基于时间的操作中使用
时间属性的行为类似于常规时间戳,可以访问,并且进行计算
8.1 定义处理时间(Processing Time)
• 处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间 的最简单概念。它既不需要提取时间戳,也不需要生成 watermark
➢ 由 DataStream 转换成表时指定
• 在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段
• 这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只 能在schema定义的末尾定义它
Table sensorTable = tableEnv.fromDataStream(dataStream,"id, temperature, timestamp, pt.proctime");
➢ 定义 Table Schema 时指定

➢ 在创建表的 DDL 中定义

8.2 定义事件时间(Event Time)
事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样 即使在有乱序事件或者延迟事件时,也可以获得正确的结果。
为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中, 提取时间戳,并用来推进事件时间的进展
定义事件时间,同样有三种方法:
➢ 由 DataStream 转换成表时指定
➢ 定义 Table Schema 时指定
➢ 在创建表的 DDL 中定义
➢ 由 DataStream 转换成表时指定
• 在 DataStream 转换成 Table,使用 .rowtime 可以定义事件时间属性

➢ 定义 Table Schema 时指定

➢ 在创建表的 DDL 中定义

9 窗口
• 时间语义,要配合窗口操作才能发挥作用
• 在 Table API 和 SQL 中,主要有两种窗口
➢ Group Windows(分组窗口)
– 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
➢ Over Windows
– 针对每个输入行,计算相邻行范围内的聚合
9.1 Group Windows
Group Windows 是使用 window(w:GroupWindow)子句定义的,并且 必须由as子句指定一个别名。
为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用

Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换 为底层 DataStream 或 DataSet 的窗口操作
9.1.1 滚动窗口(Tumbling windows)
滚动窗口要用 Tumble 类来定义

9.1.2 滑动窗口(Sliding windows)
滑动窗口要用 Slide 类来定义

9.1.3 会话窗口(Session windows)
会话窗口要用 Session 类来定义

9.2 SQL 中的 Group Windows
• Group Windows 定义在 SQL 查询的 Group By 子句中
➢ TUMBLE(time_attr, interval)
• 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
➢ HOP(time_attr, interval, interval)
• 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是 窗口长度
➢ SESSION(time_attr, interval)
• 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔
9.3 Over Windows
Over window 聚合是标准 SQL 中已有的(over 子句),可以在查询的 SELECT 子句中定义
Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合
Over windows 使用 window(w:overwindows*)子句定义,并在 select ()方法中通过别名来引用

Table API 提供了 Over 类,来配置 Over 窗口的属性
9.3.1 无界 Over Windows
• 可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows
• 无界的 over window 是使用常量指定的

9.3.2 有界 Over Windows
有界的 over window 是用间隔的大小指定的

9.4 SQL 中的 Over Windows
• 用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是 相同的分区、排序和范围
• 目前仅支持在当前行范围之前的窗口
• ORDER BY 必须在单一的时间属性上指定

10 函数(Functions)
10.1 Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数
SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现


10.2 用户自定义函数(UDF)
• 用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力
• 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
• 函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当 用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中, 这样Table API 或 SQL 解析器就可以识别并正确地解释它
10.2.1 标量函数(Scalar Functions)
1)用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值 (一进一出)
2)为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function,并实现(一个或多个)求值(eval)方法
3)标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval

package com.dongda;
import com.dongda.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
public class UdfTest1_ScalarFunction {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,
//2.创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//从文件里面读取数据
DataStream<String> inputStream = env.readTextFile("/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt");
//lamda表达式写法
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//3.将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature as temp");
//4.自定义标量函数,实现求id的hash值
//4.1 table API
HashCode hashCode = new HashCode(23);
//需要在环境中注册UDF
tableEnv.registerFunction("hashCode",hashCode);
Table resultTable = sensorTable.select("id,ts,hashCode(id)");
//4.2 SQL
tableEnv.createTemporaryView("sensor",sensorTable);
Table resultSqlTable = tableEnv.sqlQuery("select id,ts,hashCode(id) from sensor");
//打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();
}
public static class HashCode extends ScalarFunction{
private int factor = 13;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String str){
return str.hashCode()*factor;
}
}
}
10.2.2 表函数(Table Functions)
1)用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同 的是,它可以返回任意数量的行作为输出,而不是单个值 (一进多出)
2)为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFunction 并实现(一个或多个)求值方法
3)表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval

10.2.3 聚合函数(Aggregate Functions)
• 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs) 可以把一个表中的数据,聚合成一个标量值 (多进一出)
• 用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的

• AggregationFunction要求必须实现的方法:
– createAccumulator()
– accumulate() – getValue()
• AggregateFunction 的工作原理如下:
– 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用 createAccumulator() 方法创建空累加器
– 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
– 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果
package com.dongda;
import com.dongda.beans.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
public class UdfTest1_ScalarFunction {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//为了方便观察打印出来的结果,
//2.创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//从文件里面读取数据
DataStream<String> inputStream = env.readTextFile("/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt");
//lamda表达式写法
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//3.将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature as temp");
//4.自定义聚合函数,求当前传感器的平均温度值
//4.1 table API
AvgTemp avgTemp = new AvgTemp();
//需要在环境中注册UDF
tableEnv.registerFunction("avgTemp", avgTemp);
Table resultTable = sensorTable
.groupBy("id")
.aggregate("avgTemp(temp) as avgTemp")
.select("id,avgTemp");
//4.2 SQL
tableEnv.createTemporaryView("sensor", sensorTable);
Table resultSqlTable = tableEnv.sqlQuery("select id,avgTemp(temp) from sensor group by id");
//打印输出
tableEnv.toRetractStream(resultTable, Row.class).print("result");
tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
//实现自定义的AggregateFunction
public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {
@Override
public Double getValue(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0, 0);
}
//必须实现一个accumulate方法,来数据之后更新状态
public void accumulate(Tuple2<Double, Integer> accumulator, Double temp) { //必须前面是状态,后面是传进来的值
accumulator.f0 += temp;
accumulator.f1 += 1; //当前对于状态的改变
}
}
}

10.2.4 表聚合函数(Table Aggregate Functions)
• 用户定义的表聚合函数(User-Defined Table Aggregate Functions, UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表
• 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的

表聚合函数(Table Aggregate Functions)
AggregationFunction 要求必须实现的方法:
– createAccumulator()
– accumulate()
– emitValue()
TableAggregateFunction 的工作原理如下:
– 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据 结构。通过调用 createAccumulator() 方法可以创建空累加器。
– 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
– 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。
