Flink之AsyncDataStream

在flink使用过程中,虽然flink有state(状态)可以用来存储很多的缓存数据,但是不可避免的要与其他如redis,hbase 发生交互,除了这些数据库外 更常见的是可能业务需要去调个api填充信息,默认情况下,在MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间 。

Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。

也就是要介绍的AsyncDataStream

首先AsyncDataStream 有两个方法1.unorderedWait表示数据不需要关注顺序,处理完立即发送 方法需要传入的第一个参数是一个datastream 这也是和其他的算子不太一样的地方 这里的两个方法需要用AsyncDataStream调用把上游datastream作为参数传递  而不是datastream直接调用 第二个参数 就是自己实现的类。需要继承 RichAsyncFunction或者 AsyncFunction 当然 因为基础方法 没有生命周期函数 但是这个方法 主要用来和外部系统交互。所以 一般都会去继承RichAsyncFunction第三个参数 是最长等待时间  第四个参数 是一个TimeUnit  可以传入 入  TimeUnit.SECONDS这种 最后一个参数Capacity表示同时最多有多少个异步请求在处理,异步IO的方式会导致更高的吞吐量,但是对于实时应用来说该操作也是一个瓶颈。限制并发请求数,算子不会积压过多的未处理请求,但是一旦超过容量的显示会触发背压。
该参数可以不配置,但是默认是100。

2.orderedWait表示数据需要关注顺序  传入参数基本相同

以下例子简单的结合了cache 连接redis的方法 

 

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.MessageType;
import com.RedisUtils;
import com.Constants;
import com.CustomerUtils;
import com.IdentityUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class WeiboProfileAsynFunction extends RichAsyncFunction<Tuple2<Map<String, Object>,Map<String, Object>>, Tuple4<Boolean, String, String,String>> {

    private Cache<String,String> cache ;
    @Override
    public void open(Configuration parameters) throws Exception {
        this.cache = CacheBuilder.newBuilder()
                .maximumWeight(10000)
                .expireAfterAccess(60*3600L, TimeUnit.SECONDS)
                .build();
        super.open(parameters);
    }

//逻辑处理的主体函数 通过ResultFuture的对象 调用complete达到发送数据到下游的效果 complete方法可以简单类似成collector的collect方法
    @Override
    public void asyncInvoke(Tuple2<Map<String, Object>, Map<String, Object>> value, ResultFuture<Tuple4<Boolean, String, String, String>> resultFuture) throws Exception {
        Gson gson = new Gson();
        Map<String,Object> eventMap = value.f0;
        Map<String,Object> profileMap = value.f1;
    
        String appid = (String) eventMap.get("app_id");
        String time = (String) eventMap.get("svr_time");
        String key = appid+"_"+time;
        String cacheUserInfo = cache.getIfPresent(key);
        boolean isValid = true;
        if(!StringUtils.isEmpty(cacheUserInfo)){
            isValid = false;
        } else {
            cache.put(key,key);
        }


        RedisUtils redisUtils = new RedisUtils(Constants.REDIS_URI);
        IdentityUtils identityUtils = new IdentityUtils(redisUtils);
        String  api_id = identityUtils.getApiId(eventMap);

        eventMap.put("api_id",api_id);
      

        eventMap.put("customer",profileMap);

        String sEvt = gson.toJson(eventMap,Map.class);
        String customer = CustomerUtils.getCustomerProfile(eventMap, profileMap,identityUtils , MessageType.WbEvt);

        Tuple4<Boolean,String,String,String> line = new Tuple4<>(isValid,appid,sEvt,customer);


        resultFuture.complete(Collections.singleton(line));
    }
}

 

注意:

使用Async I/O,需要外部存储有支持异步请求的客户端

使用Async I/O,继承RichAsyncFunction(接口AsyncFunction<IN, OUT>的抽象类),重写或实现open(建立连接)close(关闭连接)asyncInvoke(异步调用)3个方法即可。

使用Async I/O, 最好结合缓存一起使用,可减少请求外部存储的次数,提高效率。

Async I/O 提供了Timeout参数来控制请求最长等待时间。默认,异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。

Async I/O 提供了Capacity参数控制请求并发数,一旦Capacity被耗尽,会触发反压机制来抑制上游数据的摄入。

Async I/O 输出提供乱序和顺序两种模式。

 

 

 

 

 

 


版权声明:本文为a724952091原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。