Flink之异步I/O案例(二)异步查询MySQL数据库

一、需求思考

1、通过异步查询Mysql中的以下数据:

在这里插入图片描述

2、思考

Mysql不支持异步查询,那该怎么办呢?

  • 创建线程池和数据库连接池,来实现异步的并发查询。
  • 这样异步查询中,一个请求就是一个线程,一个请求对应一个连接。

二、代码实现

1、添加依赖
  • 使用的是阿里的 druid 数据库连接池
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.20</version>
        </dependency>
2、主线代码
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

/**
 * @date: 2020/3/11 22:51
 * @site: www.ianlou.cn
 * @author: lekko 六水
 * @qq: 496208110
 */
public class AsyncQueryFromMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);

        int capacity = 20;

        SingleOutputStreamOperator<Tuple2<String, String>> result =
                AsyncDataStream.orderedWait(
                        lines,                              //要处理的数据流
                        new MySQLAsyncFunction(capacity),   //异步查询函数的具体执行实例
                        3000,                             //超时时间
                        TimeUnit.MILLISECONDS,             //时间单位
                        capacity                          //最大异步并发请求数量
                );

        result.print();

        env.execute("AsyncQueryFromMySQL");
    }
}

3、异步查询的MySql的Function
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.newFixedThreadPool;

/**
 * @date: 2020/3/11 22:57
 * @site: www.ianlou.cn
 * @author: lekko 六水
 * @qq: 496208110
 */
public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
    // 加上transient,不让其序列化
    private transient ExecutorService executorService = null;
    private transient DruidDataSource dataSource = null;

    private int maxConn;

    public MySQLAsyncFunction(int maxConn) {
        this.maxConn = maxConn;
    }

    //创建线程池、Mysql连接池
    @Override
    public void open(Configuration parameters) throws Exception {
        //创建线程池
        executorService = newFixedThreadPool(maxConn);

        //创建连接池(异步I/O 一个请求就是一个线程,一个请求对应一个连接)
        dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        dataSource.setUrl("jdbc:mysql://localhost:3306/day01?characterEncoding=utf8");
        dataSource.setMaxActive(maxConn);
    }

    @Override
    public void asyncInvoke(String id, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        //将一个查询请求丢入到线程池中
        Future<String> future = executorService.submit(new Callable<String>() { // Callable是有返回值的submit
            @Override
            public String call() throws Exception {
                return queryFromMySql(id);
            }
        });

        CompletableFuture<String> cf = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return future.get();
                } catch (Exception e) {
                    return null;
                }
            }
        });

        // 返回最终的resultFuture给方法
        cf.thenAccept((String result) -> {
            resultFuture.complete(Collections.singleton(Tuple2.of(id, result)));
        });
    }

    // 单独查询的方法
    private String queryFromMySql(String param) throws SQLException {
        String sql = "SELECT id, name FROM orde WHERE id = ?";
        String result = null;
        Connection connection = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        try {
            connection = dataSource.getConnection();
            stmt = connection.prepareStatement(sql);
            stmt.setString(1, param);
            rs = stmt.executeQuery();
            while (rs.next()) {
                result = rs.getString("name");
            }
        } finally {
            if (rs != null) {
                rs.close();
            }
            if (stmt != null) {
                stmt.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        dataSource.close();
        executorService.shutdown();
    }
}

四、技术点

  • Flink的异步I/O
  • Druid 连接池的使用
  • 从Mysql中查询数据

版权声明:本文为u010271601原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。