Flink个人学习整理-SQL篇(十四)
一、查询
1.1、未注册表 demo
public class Flink_SQL_NoTable {
public static void main(String[] args) throws Exception {
// 获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<WaterSensor> localhost = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public Sensor map(String value) throws Exception {
String[] strings = value.split(",");
return new Sensor(
strings[0],
Long.parseLong(strings[1]),
Integer.parseInt(strings[2])
);
}
});
// 如果传过来的是元组,可在 fromDataStream 中第二个参数开始,定义字段名 $("id"),$("ts"),$("vc") ...
Table table = tableEnvironment.fromDataStream(localhost);
// 使用SQL查询未注册的表
Table result = tableEnvironment.sqlQuery("select id,ts,vc from " + table + " where id = 'ws_001' ");
// 将表转换为流进行输出
tableEnvironment.toAppendStream(result, Row.class).print();
env.execute();
}
}
1.2、注册表 demo
public class Flink_SQLAgg_Table {
public static void main(String[] args) throws Exception {
// 获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Sensor> localhost = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public Sensor map(String value) throws Exception {
String[] strings = value.split(",");
return new Sensor(
strings[0],
Long.parseLong(strings[1]),
Integer.parseInt(strings[2])
);
}
});
// // 如果传过来的是元组,可在 fromDataStream 中第二个参数开始,定义字段名 $("id"),$("ts"),$("vc") ...
// Table table = tableEnvironment.fromDataStream(localhost);
// 将流注册表
tableEnvironment.createTemporaryView("sensor",localhost);
// 使用SQL查询注册的表
Table result = tableEnvironment.sqlQuery("select id,count(distinct ts),sum(vc) from sensor group by id ");
// 将表转换为流进行输出
tableEnvironment.toRetractStream(result, Row.class).print();
env.execute();
}
}
1.3、直接查询输出
Kafka To Kafka
public class Flink_SQL_KafkaToKafka {
public static void main(String[] args) throws Exception {
// 获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// DDL 注册sourceTable
TableResult tableSource = tableEnvironment.executeSql("create table sourcesql(id string,ts bigint,vc int) with (" +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +
" 'connector.topic' = 'sourcesql', " +
" 'connector.properties.bootstrap.servers' = 'aaaaaa:9092,bbbbbb:9092,cccccc:9092', " +
" 'connector.properties.group.id' = 'testGroup', " +
// earliest-offset 最早 latest-offset 最晚
" 'connector.startup-mode' = 'latest-offset', " +
" 'format.type' = 'csv' )");
// DDL 注册sinkTable
TableResult tableSink = tableEnvironment.executeSql("create table sinksql(id string,ts bigint,vc int) with (" +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +
" 'connector.topic' = 'sinksql', " +
" 'connector.properties.bootstrap.servers' = 'aaaaaa:9092,bbbbbb:9092,cccccc:9092', " +
" 'format.type' = 'json' )");
// 执行查询,插入数据
tableEnvironment.executeSql("insert into sinksql select * from sourcesql where id = 'ws_001' ");
// 因为是纯SQL的所以不需要 env.execute();
}
}
Kafka To MySQL
public class Flink_SQL_KafkaToMySQL {
public static void main(String[] args) throws Exception {
// 获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// DDL 配置KafkaSource
TableResult tableSource = tableEnvironment.executeSql("create table sourcesql(id string,ts bigint,vc int) with (" +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +
" 'connector.topic' = 'sourcesql', " +
" 'connector.properties.bootstrap.servers' = 'aaaaaa:9092,bbbbbb:9092,cccccc:9092', " +
" 'connector.properties.group.id' = 'testGroup', " +
// earliest-offset 最早 latest-offset 最晚
" 'connector.startup-mode' = 'latest-offset', " +
" 'format.type' = 'csv' )");
// DDL 配置JDBC Sink(MySQL) 不会自动在MySQL创建所需要的表,需要自己手动去创建,注意网络通信问题
TableResult tableSink = tableEnvironment.executeSql("create table sinksql(id string,ts bigint,vc int) with (" +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://aaaaaa:3306/test', " +
" 'table-name' = 'sinksql', " +
" 'username' = '______', " +
" 'password' = '______' ) " );
// 执行查询,插入数据
tableEnvironment.executeSql("insert into sinksql select * from sourcesql where id = 'ws_001' ");
// Table source_sensor = tableEnvironment.from("sourcesql");
// source_sensor.executeInsert("sinksql");
// 因为是纯SQL的所以不需要 env.execute();
}
}
ProcessTime
public class FlinkSQL_SQL_ProcessTime_StreamToTable {
public static void main(String[] args) throws Exception {
// 获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<WaterSensor> map = env.readTextFile("input/serson.txt")
.map(new MapFunction<String, Sensor>() {
@Override
public Sensor map(String value) throws Exception {
String[] strings = value.split(",");
return new Sensor(
strings[0],
Long.parseLong(strings[1]),
Integer.parseInt(strings[2])
);
}
});
// 流转换为表,并定义处理时间(SQL 要自己定义),需要加一个字段,则要手动列出全部字段 proctime():处理时间的方法
Table table = tableEnvironment.fromDataStream(map, $("id"), $("ts"), $("vc"), $("pt").proctime());
// 打印Schema信息
table.printSchema();
env.execute();
}
}
EventTime
public class FlinkSQL_SQL_EventTime_StreamToTable {
public static void main(String[] args) throws Exception {
// 获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// 读取数据并提取事件时间WaterMark
SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.readTextFile("input/serson.txt")
.map(new MapFunction<String, Sensor>() {
@Override
public Sensor map(String value) throws Exception {
String[] strings = value.split(",");
return new Sensor(
strings[0],
Long.parseLong(strings[1]),
Integer.parseInt(strings[2])
);
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Sensor>() {
@Override
public long extractTimestamp(Sensor element, long recordTimestamp) {
return element.getTs() * 1000L;
}
}));
// 将流转换为表,并指定事件时间字段 rowtime 行数据时间,事件时间
Table table = tableEnvironment.fromDataStream(waterSensorDS, $("id"), $("ts"), $("vc"), $("rt").rowtime());
table.printSchema();
env.execute();
}
}
二、Function
/*
注意:如果使用的是 12版本之前的Flink,如果使用rownumber函数,那么有个需要特殊关注的地方
*/
public class FlinkSQL_Function {
public static void main(String[] args) throws Exception {
// 获取流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 部分函数需要使用Blink执行计划 在函数那,有些函数仅Blink客户端可使用
EnvironmentSettings build = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env,build);
env.execute();
}
}
版权声明:本文为weixin_44560999原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。