import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DimUtil;
import com.atguigu.utils.DruidDSUtil;
import com.atguigu.utils.JedisPoolUtil;
import com.atguigu.utils.ThreadPoolUtil;
import lombok.SneakyThrows;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
public abstract class DimAsyncFunction <T> extends RichAsyncFunction<T,T> implements AsyncJoinFunction<T> {
private JedisPool jedisPool;
private DruidDataSource druidDataSource;
private ThreadPoolExecutor threadPoolExecutor;
private String tableName;
//tablename可以从外部传入
public DimAsyncFunction(String tableName) {
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
//使用构建的工具类初始化连接
jedisPool = JedisPoolUtil.getJedisPool();
druidDataSource = DruidDSUtil.createDataSource();
threadPoolExecutor = ThreadPoolUtil.getThreadPoolExecutor();
}
@Override
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
//开启线程,在线程中完成关联及补充
threadPoolExecutor.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
//获取连接
Jedis jedis = jedisPool.getResource();
DruidPooledConnection connection = druidDataSource.getConnection();
//通过构建一个抽象方法,外部在实现这个抽象类时必定要重写抽象方法,此时将想要的值传进来
//调用这个函数的时候一定不再是泛型,所以可以拿到所有想拿到的值
String key = getKey(input);
//查询维表
JSONObject dimInfo = DimUtil.getDimInfo(jedis, connection, tableName, key);
//补充信息
if (dimInfo != null){
//从dimInfo中取字段加入input中
join(input,dimInfo);
}
//归还连接
jedis.close();
connection.close();
//输出补充完信息的数据
resultFuture.complete(Collections.singletonList(input));
}
});
}
//超时数据,关联不上也输出
@Override
public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
System.out.println("TimeOut:"+input);
}
}import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.GmallConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.sql.Connection;
import java.util.List;
public class DimUtil {
public static JSONObject getDimInfo(Jedis jedis, Connection connection, String tableName, String key) throws Exception {
//使用redis做查询缓存
//读取Redis中的维表数据,rediskey用表名+主键(可以唯一代表一行数据),所以用String类型即可
//redis会公用,所以加个DIM标识
String redisKey = "DIM:" + tableName + ":" + key;
//string类型,使用get读取即可
String dimInfoStr = jedis.get(redisKey);
if (dimInfoStr != null) {
//重置过期时间,因为非热词不必一直缓存
jedis.expire(redisKey,24*60*60);
return JSON.parseObject(dimInfoStr);
}
//拼接SQL
String querysql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id='" + key + "'";
//查询
List<JSONObject> queryList = JdbcUtil.queryList(connection, querysql, JSONObject.class, false);
JSONObject dimInfo = queryList.get(0);
//将从Phoenix查询到的数据写入Redis一份(第一次查询时)
jedis.set(redisKey,dimInfo.toJSONString());
//设置过期时间
jedis.expire(redisKey,24*60*60);
//返回结果
return dimInfo;
}
//若是维表出现更新则删除redis中的老数据
public static void delDimInfo(Jedis jedis,String tableName, String key){
String redisKey = "DIM:" + tableName + ":" + key;
jedis.del(redisKey);
}
//测试
public static void main(String[] args) throws Exception {
//获取连接
DruidDataSource dataSource = DruidDSUtil.createDataSource();
DruidPooledConnection connection = dataSource.getConnection();
JedisPool jedisPool = JedisPoolUtil.getJedisPool();
Jedis jedis = jedisPool.getResource();
long start = System.currentTimeMillis();
//System.out.println(getDimInfo(connection, "DIM_BASE_TRADEMARK", "15")); //217ms 184ms
System.out.println(getDimInfo(jedis,connection, "DIM_BASE_TRADEMARK", "15")); // redis缓存有了之后 58ms 58ms
long end = System.currentTimeMillis();
//System.out.println(getDimInfo(connection, "DIM_BASE_TRADEMARK", "15")); //本地缓存有了之后,10ms 11ms
System.out.println(getDimInfo(jedis,connection, "DIM_BASE_TRADEMARK", "15")); //redis缓存有了之后,0ms 1ms 1ms
long end2 = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(end2 - end);
connection.close();
dataSource.close();
}
}import com.alibaba.fastjson.JSONObject;
public interface AsyncJoinFunction<T> {
//抽出来,方便其他需求使用,直接实现接口即可
String getKey(T input);
void join(T input, JSONObject dimInfo);
}版权声明:本文为qq_50408152原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。