1. 声明
当前的内容主要为记录在学习Apache Flink中遇到的问题和主要记录访问msyql实现sql查询的基本操作
主要内容为:
- 使用Flink操作的SQL API访问mysql数据库
pom文件,其中${flink.version}为1.13.0
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<!-- Add connector dependencies here. They must be in the default scope
(compile). -->
<!-- 直接导入需要的flink到kafka的连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 提供Table Api的功能 (java版的) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 提供本地运行的能力 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<!-- 提供jdbc的连接器的,可以连接数据库 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
2. 准备
1.提供通过flink连接mysql的连接器(手动导入maven依赖):官方文档
2.主要使用方式:官方文档
3.准备一个数据库flink_test并创建一个t_user的表
3. 主要demo
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.sinks.CsvAppendTableSinkFactory;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.hy.flink.test.data.StudentDatas;
/**
*
* @author hy
* @createTime 2021-06-12 13:56:26
* @description 当前内容主要为测试当前的TableApi的基本操作
* 注意使用的时候需要导入对应的maven依赖:官方依赖否则编译报错
*
*/
public class TableApiTest {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
// 无法启动:使用useAnyPlanner
// EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build();
// 无法启动:使用useOldPlanner
// EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
// 可以正常启动:使用useBlinkPlanner,但是会报错:MiniCluster is not yet running or has already been shut down.
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv,fsSettings);
tableEnv.executeSql("CREATE TABLE t_user (\n" +
"id INT,\n" +
"name VARCHAR(50),\n" +
"age INT,\n" +
"score DOUBLE,\n" +
"className VARCHAR(50)\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n"+
" 'url'='jdbc:mysql://localhost:3306/flink_test?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8',\n" +
" 'table-name'='t_user',\n" +
" 'username'='root',\n"+
" 'password'='root'\n"+
")");
Table sqlQuery = tableEnv.sqlQuery("select * from t_user where name='张三'");
TableResult tableResult = sqlQuery.execute();
CloseableIterator<Row> collect = tableResult.collect();
while(collect.hasNext()) {
Row row = collect.next();
System.out.println(row);
}
}
}
执行结果:
虽然报错了但是可以查询到数据,结果可以执行的
主要就是在执行sql的时候需要指定with并指定连接器方式和其他的属性
4. 主要出现的错误
Partial inserts are not supported(当前不支持部分插入操作)
例如:只能使用insert into t_user values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202')不能使用insert into t_user(id,name,age,score,className) values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202'),也就是不能手动选择添加的列项Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.(这个主要出现在不能使用local方式运行,导致的问题,如果需要在本地运行需要使用useBlinkPlanner()即可解决)
当前的Flink中的可以使用useBlinkPlanner在本地执行连接并执行SQL API的操作,但是使用useAnyPlanner和useOldPlanner是不行的,会出现报错2的情况
5.总结
1.当前的Flink虽然提供了操作数据库的各种连接器的SQL API但是在本地测试的时候还是以BlinkPlanner方式才可以执行
2.使用SQL API还是需要schame的,否则无法执行操作
版权声明:本文为weixin_45492007原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。