StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
String createTable1 = String.format(
//"CREATE TABLE UserScores (type STRING, orderNo STRING,productName STRING,money FLOAT,name STRING,zoneCode STRING,zoneName STRING)\n" +
"CREATE TABLE new_table_copy1 (type STRING, order_no STRING,product_name STRING,money FLOAT,name STRING,zone_Code STRING,zone_name STRING)\n"
+
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://192.168.120.160:3306/flink?useSSL=false',\n" +
" 'table-name' = 'new_table_copy1',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root'\n" +
")");
TableResult tableResult1 = tEnv.executeSql(createTable1);
Table table1 = tEnv.sqlQuery("SELECT * FROM new_table_copy1 ");
DataStream<Info> infoDataStream = tEnv.toAppendStream(table1, Info.class);
infoDataStream.print();
版权声明:本文为weixin_43943806原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。