问题
Error:(54, 16) type mismatch;
found : org.apache.flink.streaming.api.scala.OutputTag[String]
required: org.apache.flink.util.OutputTag[Any]
Note: String <: Any, but Java-defined class OutputTag is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
ctx.output(tag, value.tempature)
解决
OutputTag的类型应该和侧输出流中的类型保持一致,
若侧输出流输出类型为Double,则OutputTag中的类型均为Double
如下:
lazy val tag: OutputTag[Double] = new OutputTag[Double]("high")
processStream.getSideOutput(new OutputTag[Double]("high"))
class myProcess extends ProcessFunction[SensorReading,SensorReading]
//[SensorReading,SensorReading] 类型为输入和主路输出类型
lazy val tag: OutputTag[String] = new OutputTag[String]("high")
//[String]类型为侧输出流中的类型
//?从此处可以设定多个不同类型的侧输出流
正确代码
package xx.xx
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object outTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sourceStream = env.readTextFile("E:\\softData\\workspace_xj\\flinkkk\\src\\main\\resources\\sensor")
.map(f=>{
val arr = f.split(",")
SensorReading(arr(0).trim,arr(1).trim.toLong,arr(2).trim.toDouble)
})
/*
side output
*/
val processStream = sourceStream.process(new myProcess())
//getSideOutput(org.apache.flink.streaming.api.scala)
processStream.getSideOutput(new OutputTag[Double]("high"))
.print()
env.execute("out test")
}
}
class myProcess extends ProcessFunction[SensorReading,SensorReading] {
lazy val tag: OutputTag[Double] = new OutputTag[Double]("high")
override def processElement(value: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
//旁路
if(value.tempature>35) {
//output(org.apache.flink.util)
ctx.output(tag, value.tempature)
}
//主路
out.collect(value)
}
}
版权声明:本文为u014657468原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。