RichFunction
一般情况下我们使用一个算子,如果自己写类继承接口的话,只用重写一个算子逻辑方法即可;但是有时我们需要进行一些初始化工作,或者获得上下文信息时,只用普通的算子非常的不方便;
所以Flink提供了富函数这样的同算子一样的操作方法的算子;
它的使用方法同原算子类似,但是可以多重写两个方法,这里以map举例
map 的富函数为RichMapFunction,我们可以多重写两个方法,
在使用富函数的时候,我们可以对函数的生命周期实现两个额外的方法:
open():
是富函数的初始方法。它在每个任务首次调用转换方法(如map,flter等算子)前调用一次。Open方法通常只用于那些只需要进行一次的设置工作
close():
函数的终止方法,会在每个任务最后一次调用转换方法后调用一次。通常用于清理和释放资源。
此外,还可以使用getRuntimeContext()方法来访问函数的RuntimeContext,从RuntimeContext中获取一些信息,例如函数的并行度,访问分区状态的方法等… …
不光是多加了两个方法,使用富函数还可以获取上下文信息,
上代码
public static class MyRichMap extends RichMapFunction<String, WaterSensor>{
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
System.out.println(getRuntimeContext().getTaskName()+"-------"+getRuntimeContext().getTaskNameWithSubtasks());
return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2]));
}
//声明周期
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open........");
}
@Override
public void close() throws Exception {
System.out.println("close........");
}
}
输出结果如下
可以看见我们获得了算子前后的信息,并且我们在flink环境中设定了2个并行度,所以map会有两个并行度,那么我们的富函数的open和关闭方法就被调用了两次
注:只是open和close方法被调用两次,但是富函数中的函数逻辑被调用多次,这也是单独抽出两个open和close方法的原因,避免多次重复浪费资源。
简单滚动聚合算子 RollAgg
滚动聚合算子只能使用在重新分区的算子(KeyBy,shuffle,reblance,rescale)之后,如使用在keyBy之后,就发现可以通过.方法点出来如min,max 等滚动聚合算子
他们的作用为返回这个分区内的特定值,如我们的元素为某个对象,有a,b,c三个参数,那我们分区后,key就是a;那么我们滚动聚合算子针对的分区的a的值都相等
当我们使用min,max时,返回的值a不用说,肯定这个分区a的值都是一样的,那么c的值,如果用max则是这个分区的最大值,注意的是b的值,它是这个分区第一个出现的值
举例:我们先输入为1,1,1;那么再输入1,2,2;同一个分区内,a为1,c为最大值2,b会取进入的第一个值也就是1,
那么返回对象为1,1,2;
如果此时再传来一个对象为1,3,3;则同理返回1,1,3
当我们使用minBy,maxBy这种滚动聚合算子时,会多加一个参数,默认时true
当为true时,如果输入参数有相同的max或min,则b会取第一个输入
举例:
我们使用maxby算子,我们先输入为1,1,1;那么再输入1,2,2;此时会返回1,2,2;如果再输入数据1,3,2;
由于c的值相等,我们会取第一次输入且c最大的数据,也就是返回1,2,2;如果此时再输入1,4,4;由于此时最大的c为4,所以会返回1,4,4;
false则与上面相反,相关参数相同时,无关参数选择最新的一组元素