1、异步查询MySQL使用线程池创建多链接实现多请求
2、代码实现
/**
* 通过线程池创建多线程,创建多个链接,执行多个查询,实现异步效果
* 适用于没有异步客户端的数据库
*/
public class SyncMySqlQueryWithThreadPool {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> line = env.socketTextStream("localhost", 8888);
AsyncDataStream.orderedWait(line,new MySqlAsyncFunction(20),30000, TimeUnit.MILLISECONDS,20).print();
env.execute();
}
}
/**
* <!-- 连接池依赖https://mvnrepository.com/artifact/com.alibaba/druid -->
* <dependency>
* <groupId>com.alibaba</groupId>
* <artifactId>druid</artifactId>
* <version>1.2.8</version>
* </dependency>
*/
class MySqlAsyncFunction extends RichAsyncFunction<String, Tuple2<String,String>>
{
//不能加transient
private int maxConnTotal;
private transient ExecutorService executorService;
private DruidDataSource dataSource;
public MySqlAsyncFunction(int maxConnTotal) {
this.maxConnTotal = maxConnTotal;
}
@Override
public void open(Configuration parameters) throws Exception {
//创建一个线程池,实现并发请求
executorService = Executors.newFixedThreadPool(maxConnTotal);
//创建连接池(异步IO 一个请求就是一个线程,一个请求对应一个连接)
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUsername("root");
dataSource.setPassword("root");
dataSource.setUrl("jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8");
dataSource.setMaxActive(maxConnTotal);
}
@Override
public void close() throws Exception {
executorService.shutdown();
dataSource.close();
}
@Override
public void asyncInvoke(String input, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
//将查询请求放入线程池
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return queryFromMySql(input);
}
});
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((String result) -> {
resultFuture.complete(Collections.singleton(Tuple2.of(input, result)));
});
}
private String queryFromMySql(String param) throws SQLException {
String sql = "SELECT id, name FROM tb_user WHERE id = ?";
String result = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
stmt.setString(1, param);
rs = stmt.executeQuery();
while (rs.next()) {
result = rs.getString("name");
}
} finally {
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}
return result;
}
}
版权声明:本文为m0_50186249原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。