一、需求思考
1、通过异步查询Mysql中的以下数据:

2、思考
Mysql不支持异步查询,那该怎么办呢?
- 创建线程池和数据库连接池,来实现异步的并发查询。
- 这样异步查询中,一个请求就是一个线程,一个请求对应一个连接。
二、代码实现
1、添加依赖
- 使用的是阿里的 druid 数据库连接池
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
2、主线代码
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
/**
* @date: 2020/3/11 22:51
* @site: www.ianlou.cn
* @author: lekko 六水
* @qq: 496208110
*/
public class AsyncQueryFromMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);
int capacity = 20;
SingleOutputStreamOperator<Tuple2<String, String>> result =
AsyncDataStream.orderedWait(
lines, //要处理的数据流
new MySQLAsyncFunction(capacity), //异步查询函数的具体执行实例
3000, //超时时间
TimeUnit.MILLISECONDS, //时间单位
capacity //最大异步并发请求数量
);
result.print();
env.execute("AsyncQueryFromMySQL");
}
}
3、异步查询的MySql的Function
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.newFixedThreadPool;
/**
* @date: 2020/3/11 22:57
* @site: www.ianlou.cn
* @author: lekko 六水
* @qq: 496208110
*/
public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
// 加上transient,不让其序列化
private transient ExecutorService executorService = null;
private transient DruidDataSource dataSource = null;
private int maxConn;
public MySQLAsyncFunction(int maxConn) {
this.maxConn = maxConn;
}
//创建线程池、Mysql连接池
@Override
public void open(Configuration parameters) throws Exception {
//创建线程池
executorService = newFixedThreadPool(maxConn);
//创建连接池(异步I/O 一个请求就是一个线程,一个请求对应一个连接)
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUsername("root");
dataSource.setPassword("123456");
dataSource.setUrl("jdbc:mysql://localhost:3306/day01?characterEncoding=utf8");
dataSource.setMaxActive(maxConn);
}
@Override
public void asyncInvoke(String id, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
//将一个查询请求丢入到线程池中
Future<String> future = executorService.submit(new Callable<String>() { // Callable是有返回值的submit
@Override
public String call() throws Exception {
return queryFromMySql(id);
}
});
CompletableFuture<String> cf = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
});
// 返回最终的resultFuture给方法
cf.thenAccept((String result) -> {
resultFuture.complete(Collections.singleton(Tuple2.of(id, result)));
});
}
// 单独查询的方法
private String queryFromMySql(String param) throws SQLException {
String sql = "SELECT id, name FROM orde WHERE id = ?";
String result = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
stmt.setString(1, param);
rs = stmt.executeQuery();
while (rs.next()) {
result = rs.getString("name");
}
} finally {
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}
return result;
}
@Override
public void close() throws Exception {
dataSource.close();
executorService.shutdown();
}
}
四、技术点
- Flink的异步I/O
- Druid 连接池的使用
- 从Mysql中查询数据
版权声明:本文为u010271601原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。