在flink使用过程中,虽然flink有state(状态)可以用来存储很多的缓存数据,但是不可避免的要与其他如redis,hbase 发生交互,除了这些数据库外 更常见的是可能业务需要去调个api填充信息,默认情况下,在MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间 。
Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
也就是要介绍的AsyncDataStream
首先AsyncDataStream 有两个方法1.unorderedWait表示数据不需要关注顺序,处理完立即发送 方法需要传入的第一个参数是一个datastream 这也是和其他的算子不太一样的地方 这里的两个方法需要用AsyncDataStream调用把上游datastream作为参数传递 而不是datastream直接调用 第二个参数 就是自己实现的类。需要继承 RichAsyncFunction或者 AsyncFunction 当然 因为基础方法 没有生命周期函数 但是这个方法 主要用来和外部系统交互。所以 一般都会去继承RichAsyncFunction第三个参数 是最长等待时间 第四个参数 是一个TimeUnit 可以传入 入 TimeUnit.SECONDS这种 最后一个参数Capacity表示同时最多有多少个异步请求在处理,异步IO的方式会导致更高的吞吐量,但是对于实时应用来说该操作也是一个瓶颈。限制并发请求数,算子不会积压过多的未处理请求,但是一旦超过容量的显示会触发背压。
该参数可以不配置,但是默认是100。
2.orderedWait表示数据需要关注顺序 传入参数基本相同
以下例子简单的结合了cache 连接redis的方法
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.MessageType;
import com.RedisUtils;
import com.Constants;
import com.CustomerUtils;
import com.IdentityUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class WeiboProfileAsynFunction extends RichAsyncFunction<Tuple2<Map<String, Object>,Map<String, Object>>, Tuple4<Boolean, String, String,String>> {
private Cache<String,String> cache ;
@Override
public void open(Configuration parameters) throws Exception {
this.cache = CacheBuilder.newBuilder()
.maximumWeight(10000)
.expireAfterAccess(60*3600L, TimeUnit.SECONDS)
.build();
super.open(parameters);
}
//逻辑处理的主体函数 通过ResultFuture的对象 调用complete达到发送数据到下游的效果 complete方法可以简单类似成collector的collect方法
@Override
public void asyncInvoke(Tuple2<Map<String, Object>, Map<String, Object>> value, ResultFuture<Tuple4<Boolean, String, String, String>> resultFuture) throws Exception {
Gson gson = new Gson();
Map<String,Object> eventMap = value.f0;
Map<String,Object> profileMap = value.f1;
String appid = (String) eventMap.get("app_id");
String time = (String) eventMap.get("svr_time");
String key = appid+"_"+time;
String cacheUserInfo = cache.getIfPresent(key);
boolean isValid = true;
if(!StringUtils.isEmpty(cacheUserInfo)){
isValid = false;
} else {
cache.put(key,key);
}
RedisUtils redisUtils = new RedisUtils(Constants.REDIS_URI);
IdentityUtils identityUtils = new IdentityUtils(redisUtils);
String api_id = identityUtils.getApiId(eventMap);
eventMap.put("api_id",api_id);
eventMap.put("customer",profileMap);
String sEvt = gson.toJson(eventMap,Map.class);
String customer = CustomerUtils.getCustomerProfile(eventMap, profileMap,identityUtils , MessageType.WbEvt);
Tuple4<Boolean,String,String,String> line = new Tuple4<>(isValid,appid,sEvt,customer);
resultFuture.complete(Collections.singleton(line));
}
}
注意:
使用Async I/O,需要外部存储有支持异步请求的客户端
使用Async I/O,继承RichAsyncFunction(接口AsyncFunction<IN, OUT>的抽象类),重写或实现open(建立连接)、close(关闭连接)、asyncInvoke(异步调用)3个方法即可。
使用Async I/O, 最好结合缓存一起使用,可减少请求外部存储的次数,提高效率。
Async I/O 提供了Timeout参数来控制请求最长等待时间。默认,异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。
Async I/O 提供了Capacity参数控制请求并发数,一旦Capacity被耗尽,会触发反压机制来抑制上游数据的摄入。
Async I/O 输出提供乱序和顺序两种模式。