Flink异步查询MySQL使用线程池创建多链接实现多请求

1、异步查询MySQL使用线程池创建多链接实现多请求

2、代码实现

/**
 * 通过线程池创建多线程,创建多个链接,执行多个查询,实现异步效果
 * 适用于没有异步客户端的数据库
 */
public class SyncMySqlQueryWithThreadPool {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> line = env.socketTextStream("localhost", 8888);

        AsyncDataStream.orderedWait(line,new MySqlAsyncFunction(20),30000, TimeUnit.MILLISECONDS,20).print();

        env.execute();
    }
}

/**
 * <!-- 连接池依赖https://mvnrepository.com/artifact/com.alibaba/druid -->
 *         <dependency>
 *             <groupId>com.alibaba</groupId>
 *             <artifactId>druid</artifactId>
 *             <version>1.2.8</version>
 *         </dependency>
 */
class MySqlAsyncFunction extends RichAsyncFunction<String, Tuple2<String,String>>
{
    //不能加transient
    private int maxConnTotal;
    private transient ExecutorService executorService;
    private DruidDataSource dataSource;

    public MySqlAsyncFunction(int maxConnTotal) {
        this.maxConnTotal = maxConnTotal;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //创建一个线程池,实现并发请求
         executorService = Executors.newFixedThreadPool(maxConnTotal);
        //创建连接池(异步IO 一个请求就是一个线程,一个请求对应一个连接)
         dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        dataSource.setUrl("jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8");
        dataSource.setMaxActive(maxConnTotal);
    }

    @Override
    public void close() throws Exception {
        executorService.shutdown();
        dataSource.close();
    }

    @Override
    public void asyncInvoke(String input, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
        //将查询请求放入线程池
        Future<String> future = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return queryFromMySql(input);
            }
        });

        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return future.get();
                } catch (Exception e) {
                    return null;
                }
            }
        }).thenAccept((String result) -> {
            resultFuture.complete(Collections.singleton(Tuple2.of(input, result)));
        });
    }
    private String queryFromMySql(String param) throws SQLException {
        String sql = "SELECT id, name FROM tb_user WHERE id = ?";
        String result = null;
        Connection connection = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        try {
            connection = dataSource.getConnection();
            stmt = connection.prepareStatement(sql);
            stmt.setString(1, param);
            rs = stmt.executeQuery();
            while (rs.next()) {
                result = rs.getString("name");
            }
        } finally {
            if (rs != null) {
                rs.close();
            }
            if (stmt != null) {
                stmt.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
        return result;
    }
}

版权声明:本文为m0_50186249原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。