概念与通用的API
Table API和SQL集成在同一套API中。这套API的核心概念式Table,用作查询的输入和输出。本文介绍了Table API和SQL程序的通用结构、如何注册Table、如何查询Table以及如何输出Table。
两种计划器的主要区别
1.Blinlk将批处理作业看做是流处理的一个特例。严格来说,Table和DataSet之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。
2.Blink计划器不支持BatchTableSource,而是使用有界的StreamTableSource来代替。
3.旧计划器和Blink计划器中FilterableTableSource的实现是不兼容的。旧计划器会将PlannerExpression下推至FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
4.基于字符串的键值配置选项仅在 Blink 计划器中使用。
5.PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
6.Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
7.旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
Table API 和 SQL 程序的结构
// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...
创建TableEnvironment:
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
- 在内部的 catalog 中注册 Table
- 注册外部的 catalog
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数 (scalar、table 或 aggregation)
- 将 DataStream 或 DataSet 转换成 Table
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Table总是与特定的TableEnvironment绑定,不能在同一条查询语句中使用不同的TableEnvironment中的表。例如:join或者union的操作。
TableEnvironment可以通过静态的BatchTableEnvironment.create()或者StreamTableEnvironmetn.create()在StreamExecutionenvironment或者ExecutionEnvironment中创建,TableConfig是可选项。TableConfig可用于配置TableEvironment或定制的查询优化和转换过程。
如果两种计划器的 jar 包都在 classpath 中(默认行为),你应该明确地设置要在当前程序中使用的计划器。
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
注意: 如果/lib目录中只有一种计划器的 jar 包,则可以使用useAnyPlanner(python 使用 use any_u_planner)创建 EnvironmentSettings。
在Catlaog中创建表
TableEnvironment维护着一个标识符(identifier)创建表的catalog的映射。标识符有三个部分组成:catlog名称,数据库名称以及对象名称。如果catalog或者数据库没有指明,那么使用当前默认值。
Table可以是虚拟的(视图Views)也可以是常规的(Tables)。视图Views可以从已经存在的表中创建,一般是Table API或者是SQL查询结果。表Tables描述的是外部数据,比如:数据库表、文件或者消息队列的。
临时表(Temporary Table)和永久表(Permanent Table)
- 表可以是临时的,并与单个Flink会话(session)相关,也可以是永久的,并且多个Flink会话和集群(cluster)可见。
- 永久表需要catalog(Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到catalog的Flink会话可见及持续的存在,直到被明确的删除。
- 另一方面临时表保存在内存中,并且仅存在创建Flink会话期间持续存在。这些表对于其他会话是不可见的。它们不与任何catalog或者数据库绑定但可以在同一个命名空间(namespace)中创建。即使它们的数据库被删除,它们也不会被删除。
屏蔽(Shadowing)
可以使用与已存在的永久表相同的标识符去注册临时表。临时表会屏蔽永久表,并且临时表存在永久表就无法访问。所有使用该标识符的查询都会作用于这个临时表。
这可能对实验(experimentation)有用。它允许先对临时表进行完全相同的查询,例如只有一个子集的数据,或者数据是不确定的。一旦验证了查询的准确性,就可以对实际生产表进行查询。
创建表
虚拟表
在SQL的术语中,Table API的对象对应于视图(虚拟表)。他封装了一个逻辑查询计划。可以通过以下方法在catalog中创建
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
注意:
从传统数据库角度来看,Table对象和view视图非常像。也就是定义了Table的查询是没有被优化的,而且会被内嵌到另一个引用了这个注册了的Table的查询中。如果多个查询引用到了同一个注册了的Table,那么它会被内嵌到每个查询中并执行多次,也就是注册了的Table的结果不会被共享。(注:Blink计划器会优化成只执行一次)
Cannextor Tables
另外一种方式创建TABLE是通过connector声明。Connector描述了存储表数据的外部系统。存储系统包括Kafka或者常规的外部存储文件系统通过这种方式来声明。
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
扩展标识符
表是通过三元标识符注册,包括catalog名、数据库和表名。
用户可以指定一个catalog和数据库作为当前catalog和当前数据库。有了这些,那么刚刚提到的三元标识符的前两部分就可以被省略了。如果前两部分的标识符没有指定,那么会使用当前的catalog和当前数据库。用户可以通过Table API或SQL切换当前的catalog和当前的数据库。
标识符遵循SQL标准,因此使用时需要使用反引号进行转义
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
查询表
Table API是关于Scala和Java的集成语言式查询api。与SQL相反,Table api的查询不是有字符串指定,而是在宿主语言逐步构建。
Table api是基于table类的,该类表示一个表(流或批处理),并提供使用操作关系的方法。这些方法返回一个新的table对象,该对象表示对输入的table关系进行关系操作的结果。一些关系操作有多个方法调用组成,例如table.groupby().select(),其中groupby指定table的分组,而select()在分组上投影。
文档table api说明了所有流处理和批处理支持的api算子。
以下展示了一个简单的table api聚合查询:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName")
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
// emit or convert Table
// execute query
SQL
Flink SQL是基于实现SQL标准的Apache calcite的。SQL的查询有常规字符串指定。
文档SQL描述了对流处理和批处理表的支持。
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
混用table api和SQL
table api和SQL查询的混用非常简单因为他们都返回table对象:
- 可以在查询SQL返回的table对象上定义table api查询。
- 在TableEnvironmnet中注册表的结果表中可以在SQL查询的from语句中引用。通过这种方法就可以在table api查询的结果上定义SQL的查询。
输出表
Table可以通过写入Table sink输出。table sink是一个通用的接口,用于支持多种文件格式(如csv、parquet、avro)、存储系统(如jdbc、hbase、Cassandra、Elasticsearch)或消息队列系统(apache Kafka、rabbitmq)。
批处理table只能写入batchtable sink,而流处理table需要指定写入AppendStreamTableSink,RetractStreamTableSink或者UpsertStreamTableSink。
请参考文档table source & sinks以获取更多关于可用sink的信息以及如何自定义table sink。
方法table.executeInsert(string tablename)将table发送至已注册table sink。该方法通过名称在catalog中查找table sink并确认table schema和tablesink schema一致。
下面的示例演示如何输出table:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.BIGINT());
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable");
翻译与执行查询
两种计划器翻译和执行查询的方式是不同的
不论输入数据源是流式还是批式的,table api和SQL查询都会被转换成datastream程序。查询在内部表示为逻辑拆查询计划,并被翻译成两段:
1.优化逻辑执行查询
2.翻译成DataStream程序
Table API或者SQL查询在下列情况下不会被翻译:
- 当TableEnvironment.executeSql()被调用时,该方法是用来执行一个SQL语句,一旦方法被调用SQL语句立即被翻译。
- 当Table.executeInsert()被调用时。该方法是用来将一个表的内容插入到目标表中,一旦方法被调用,table api程序立即被翻译。
- 当table.execute()被调用时。该方法是将一个表的内容收集到本地,一旦方法被调用,table api程序立即被翻译。
- 当StaementSet.execute()被调用时。Table(通过StatementSet.addInsert()输出给某个sink)和insert语句(通过调用statmentSet.addInsertSql())会被先缓存到StatementSet中,StatementSet.execute()方法被调用时,所有的sink都会被优化成一张有向无环图。
当Table被转换成DataStream时。转换完成后,他就成为一个普通的DataStream程序,并会在调用StreamExecutionEnvirenment.execute()时被执行。
注意:从1.11版本开始,sqlUpdate方法和insertInto方法被废弃,从这两个方法构建的Table程序必须通过StreamTableEnvironment.execute()方法执行,而不能通过StreamTableEnvironment.execute()方法执行。
与DataStream和DataSetAPI结合
在流处理方面两种计划器都可以与DataStream API结合。只有旧计划器可以与DataSet结合。Blink计划器不能同两种计划器中的任何一个结合。
- 注意:下文讨论的DataSet API只与旧计划有关。
Table API和SQL可以被很容易的集成并嵌入到DataStream和DataSet程序中。例如:可以查询外部表(例如RDBMS),进行一些预处理,例如过滤、投影,聚合或元数据join,然后使用DataStrem或者DataSet API(以及这些api之上构建的任何库,例如cep或Gelly)。相反,也可以将Table API或SQL查询应用于DataStream或DataSet程序的结果。
这种交互可以通过DataStream或DataSet与table的相互转化实现。本节我们会介绍这些转化是如何实现的。
Scala隐式转换
Scala Table API含有对DataSet、DataStream和table类的隐式转换。通过为Scala DataStreamAPI导入org.apache.flink.table.api.bridge.scala.包以及org.apache.flink.api.scala. 包,可以启用这些转换。
通过 DataSet 或 DataStream 创建视图
Scala Table API含有对DataSet、DataStream注册成视图。结果视图的schema取决于注册的DataStream或者DataSet的数据类型。
通过DataStream和DataSet注册的视图只能是临时视图。
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
将DataStream或DataSet转换成表
与在TableEnvironment注册成DataStream或者DataSet不同,DataStream和DataSet可以直接转换成table
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
将表转换成DataStream或DataSet
Table可以被转换成DataStream和DataSet。通过这种方式,定制的DataStream或者DataSet就可以在Table api或者SQL的查询结果上运行了。
将将table转换成DataStream或者DataSet你要指定转换的数据类型,即,table的每行数据要转换成的数据类型。通常最方便的选择是转换成Row。
-Row: 字段按位置映射,字段数量任意,支持null值,无类型安全(type-safe)检查。
- POJO:字段名称映射(POJO类必须按照Table中字段名称命名),字段数量任意,可以为null,无类型安全检查。
- Case Class:按字段位置映射,不支持null,有类型安全检查。
- Tuple:字段按位置映射,字段数量少于22(scala)或者25(java),不支持null值,无类型安全检查。
- Atomic Type:Table必须有一个值,不支持null值,有类型安全检查。
将表转换成DataStream
流式查询结果会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样动态的查询结果转换成DataStream需要对表的更新方式编码。
将Table转换成DataStream有两种模式:
1.Append Mode:仅当动态Table仅通过Insert更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
2.Retract Mode:任何情形都可以使用此模式。它使用boolean或者inset或者delete操作的数据进行标记。
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
Table table = ...
// 通过指定类将表转换为行的追加数据流
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 通过指定类将表转换为行的追加数据流 Tuple2<String, Integer>
// 通过类型信息
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 返回将表转换为行的数据流
// 返回的是一个DataStream<Tuple2<Boolean, X>>.
// 布尔字段标记更改标记的类型
// True是insert,False是delete
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
注意: 文档动态表给出了有关动态表及其属性的详细讨论。
注意:一旦Table被转换为DataStream,必须使用StreamExecutionEnvironment的execute方法执行该DataStream作业。
将表转换成DataSet如下
// get BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
注意:一旦 Table 被转化为 DataSet,必须使用 ExecutionEnvironment 的 execute 方法执行该 DataSet 作业。
数据类型到Table Schema的映射:
Flink的DataStream和DataSet APIs支持多样的数据类型。例如:Tuple(Scala内置以及Flink Java tuple)、pojo类、scala的case class类型以及flink的row类型等允许嵌套多个可在表的表达式中访问字段的符合数据类型。其他类型被视为原子类型。
我们讨论Table API如何将这些数据类型转换为内部的row表示形式,并将提供DataStream转换成table的样例。
数据类型到table schema的映射有两种方式:基于字段位置或基于字段名称。
基于位置的映射
基于位置的映射可以保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型及原子类型。如:tuple、row以及case class这些复合的数据类型都有这样的字段顺序。然而,pojo类型的字段顺序必须通过名称映射。可以将字段投影出来,但不能使用as重命名。
定义基于位置的映射时,输入类型中一定不能存在指定的名称,否则API会假定应该基于字段名称映射。如果未指定任何字段的名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用f0表示原子类型。
// 获取流式运行环境,同时也能运行批处理环境
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
DataStream<Tuple2<Long, Integer>> stream = ...
// 将数据流转换为具有默认字段名f或f1的表
Table table = tableEnv.fromDataStream(stream);
// 将数据流转换为仅包含字段“myLong”的表
Table table = tableEnv.fromDataStream(stream, $("myLong"));
// 将数据流转换为字段名为“myLong”和“myInt”的表
Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myInt"));
基于名称的映射
基于名称的映射适用于任何数据类型或者pojo类型。这是定义table schema映射灵活的方式。映射中所有的字段均按名称引用,并且可以通过as重命名。字段可以被重新排序和映射。
若如果没有任何指定字段名称,则使用默认的字段名称和符合数据类型的字段顺序,或者使用f0表示原子类型。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 将数据流转换为具有交换字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// 将数据流转换为具有交换字段和字段名“myInt”和“myLong”的表
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
原子类型
Flink将基础的数据类型(Integer、Double、String)或者通用数据类型(不可拆分的数据类型)视为原子类型。原子类型的DataStream或者DataSet会被转换成只有一条属性的table。属性的数据类型可以由原子类型推断出,还可以重新命名属性。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Long> stream = ...
// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, $("myLong"));
Tuple类型(Scala 和 Java)和 Case Class类型(仅 Scala)
Flink支持scala内置的tuple类型和并给Java提供自己的tuple类型。两种tuple的DataStream和DataSet都被转成表。可以通过提供所有字段名称重命名字段(基于位置映射)。如果没有任何字段名称,则会使用默认的字段名称。如果引用了字段名称(对于Flink tuple为f0、f1…,对于Scala的tuple为_1,_2…),则api会假定映射是基于名称而不是基于位置的。基于名称的映射可以通过as字段和投影进行重新排序。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// 将数据流转换为具有默认字段名“f0”、“f1”的表
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
// 将数据流转换为具有重命名字段名“myLong”、“myString”(基于位置)的表
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// 将数据流转换为带有映射字段“f1”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 将数据流转换为具有重新排序和别名字段“myString”、“myLong”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as("myLong"));
POJO 类型 (Java 和 Scala)
Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录在here.
在不指定字段名称的情况下将DataStream或者DataSet转换成Table时,将使用原始pojo类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有as的关键字)来重命名,重新排序和投影。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Person是带有“name”和“age”字段的POJO
DataStream<Person> stream = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);
// 将数据流转换为默认字段名为“age”、“name”的表(字段按名称排序!)
Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));
// 将数据流转换为具有投影字段“name”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("name"));
// 将数据流转换为带有投影和重命名字段“myName”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));
Row类型
Row类型支持任意数量的字段及具有null值的字段。字段名称可以通过RowTypeInfo指定,也可以将在Row的DataStream或者DataSet转换为Table时指定。Row类型的字段映射支持基于名称和基于位置两种方式,字段可以通过提供所有字段的名称的方式重新命名(基于位置得映射)或者分别选择进行投影/排序/重命名(基于名称的映射)。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// 在`RowTypeInfo中指定了两个字段“name”和“age”的行的数据流`
DataStream<Row> stream = ...
// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);
// 将数据流转换为具有重命名字段名“myName”、“myAge”(基于位置)的表
Table table = tableEnv.fromDataStream(stream, $("myName"), $("myAge"));
// 将数据流转换为具有重命名字段“myName”、“myAge”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("name").as("myName"), $("age").as("myAge"));
// 将数据流转换为具有投影字段“name”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("name"));
// 将数据流转换为带有投影和重命名字段“myName”(基于名称)的表
Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));
查询优化
Apache Flink使用并扩展了Apache Calcite来进行复杂的查询优化,这包括一系列基于规则和成本的优化,例如:
- 基于Apache Calcite的子查询解相关
- 投影剪裁
- 分区剪裁
- 过滤器下推
- 子查询消除重复数据以避免重复计算
- 特殊子查询重写,包括两部分:
1.将in和exists转换为left semi-joins
2.将not in和not exists转换为left anti-join- 可选join重新排序
通过table.optimizer.join-reorder-enable启用
注意:当前仅在子查询重写集合的条件下支持in/exists/not in/not exists
优化器不仅基于计划,而且还基于可从数据源获得丰富得统计信息以及每个算子(例如io、cpu、网络和内存)的细粒度成本来做出明智的决策。
高级用户可以通过CalciteConfig对象提供自定义的优化,可以通过调用TableEnvironment#getConfig#setPlannerConfig将其提供TableEnvironment。
解释表
Table API提供了一种机制来解释计算Table的逻辑和优化查询计划。这是通过Table.explain()方法或者StatmentSet.explain()方法来完成的。Table.explain()返回多个sink计划的结果。它返回一个描述三种计划的字符串:
1.关系查询的抽象语法树(the abstract syntax tree),即未优化的查询计划。
2.优化的逻辑查询计划,以及
3.物理执行计划
可以用TableEnvironment.explain()方法和TableEnvironment.executeSql()方法支持执行一个explain获取逻辑和优化的查询计划,EXPLAIN
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
.where($("word").like("F%"))
.unionAll(table2);
System.out.println(table.explain());
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
以下代码展示了一个示例以及使用 StatementSet.explain() 的多 sink 计划的相应输出:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
final Schema schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING());
tEnv.connect(new FileSystem().path("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1");
tEnv.connect(new FileSystem().path("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2");
tEnv.connect(new FileSystem().path("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1");
tEnv.connect(new FileSystem().path("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2");
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.addInsert("MySink1", table1);
Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.addInsert("MySink2", table2);
String explanation = stmtSet.explain();
System.out.println(explanation);
多 sink 计划的结果是:
== Abstract Syntax Tree ==
LogicalLegacySink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalLegacySink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])
LegacySink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 3 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
ship_strategy : FORWARD
Stage 5 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Stage 9 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 10 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 12 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
Stage 14 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD