【Flink】在算子中使用Tuple出现的bug

在使用map,flatMap 算子的过程中想把pojo 转换成Tuple 类型进行输出,在编码的过程中正常,运行是出现异常

flink version 1.13

scala 2.12

jdk 1.8

1、问题

1.1、 报错关键日志

org.apache.flink.api.common.functions.InvalidTypesException: 
The return type of function 'main(Stream.java:287)' could not 
be determined automatically, due to type erasure. 
You can give type information hints by using the returns(...) 
method on the result of the transformation call, 
or by letting your function implement the 
'ResultTypeQueryable' interface.




org.apache.flink.api.common.functions.InvalidTypesException: 
The generic type parameters of 'Tuple24' are missing. 
In many cases lambda methods don't provide enough information 
for automatic type extraction when Java generics are involved. 
An easy workaround is to use an (anonymous) class instead that 
implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information

1.2、 报错源码

resultStream.map( v ->
					new Tuple24<>(v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,
							v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a))
					.setParallelism(35).print("结果:").setParallelism(1);

2、原因

报错中提示出现的是无效的类型异常,在这里我们只是使用了map 进行转tuple 类型,tuple 中默认返回的是一个object,log 中其实也提示了我们可以怎么解决这个问题,需要告知程序显式的返回数据类型:

You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface

3、解决思路

既然知道了出现的问题,解决起来就简单了,我们就在返回的时候显式的添加上数据类型,也就是加上returns方法,最终代码如下:

3.1、 添加Tuple 类型

resultStream.map( v ->
					new Tuple24<>(v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,
							v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a,v.a))
					.returns(Types.TUPLE(Types.STRING,Types.LONG,Types.LONG,Types.LONG,
							Types.LONG,Types.LONG,Types.LONG,Types.LONG,Types.LONG,
							Types.LONG,Types.LONG,Types.LONG,Types.LONG,
							Types.DOUBLE,Types.LONG,Types.LONG,
							Types.DOUBLE,Types.LONG,Types.LONG,
							Types.DOUBLE,Types.LONG,Types.LONG,
							Types.DOUBLE,Types.STRING))
					.setParallelism(35).print("结果:").setParallelism(1);

3.2 、继承Tuple

具体的代码我就不再写一遍了,大家可以参考我圈出来的部分

4、完美解决及拓展

程序立马可以运行起来了,完美解决了问题,想学习这方面更多的解决办法也可以参考 官网学习起来:Java Lambda Expressions | Apache Flink


版权声明:本文为Zsigner原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。