flink连接mysql

StreamExecutionEnvironment  env = StreamExecutionEnvironment.getExecutionEnvironment();

 StreamTableEnvironment       tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        );
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
String createTable1 = String.format(
                //"CREATE TABLE UserScores (type STRING, orderNo STRING,productName STRING,money FLOAT,name STRING,zoneCode STRING,zoneName STRING)\n" +
                "CREATE TABLE new_table_copy1 (type STRING, order_no STRING,product_name STRING,money FLOAT,name STRING,zone_Code STRING,zone_name STRING)\n"
                        +
                        "WITH (\n" +
                        "  'connector' = 'jdbc',\n" +
                        "  'url' = 'jdbc:mysql://192.168.120.160:3306/flink?useSSL=false',\n" +
                        "  'table-name' = 'new_table_copy1',\n" +
                        "  'username' = 'root',\n" +
                        "  'password' = 'root'\n" +
                        ")");
        TableResult tableResult1 = tEnv.executeSql(createTable1);
        Table table1 = tEnv.sqlQuery("SELECT * FROM new_table_copy1 ");
        DataStream<Info> infoDataStream = tEnv.toAppendStream(table1, Info.class);
        infoDataStream.print();

版权声明:本文为weixin_43943806原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。