侧输出流中的type mismatch错误解决

问题

Error:(54, 16) type mismatch;
 found   : org.apache.flink.streaming.api.scala.OutputTag[String]
 required: org.apache.flink.util.OutputTag[Any]
Note: String <: Any, but Java-defined class OutputTag is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
    ctx.output(tag, value.tempature)

解决

OutputTag的类型应该和侧输出流中的类型保持一致,
若侧输出流输出类型为Double,则OutputTag中的类型均为Double
如下:

lazy val tag: OutputTag[Double] = new OutputTag[Double]("high")
processStream.getSideOutput(new OutputTag[Double]("high"))
class myProcess extends ProcessFunction[SensorReading,SensorReading] 
//[SensorReading,SensorReading] 类型为输入和主路输出类型
lazy val tag: OutputTag[String] = new OutputTag[String]("high")
//[String]类型为侧输出流中的类型
//?从此处可以设定多个不同类型的侧输出流

正确代码

package xx.xx

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object outTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val sourceStream = env.readTextFile("E:\\softData\\workspace_xj\\flinkkk\\src\\main\\resources\\sensor")
      .map(f=>{
        val arr = f.split(",")
        SensorReading(arr(0).trim,arr(1).trim.toLong,arr(2).trim.toDouble)
      })
/*
side output
 */
    val processStream = sourceStream.process(new myProcess())
    //getSideOutput(org.apache.flink.streaming.api.scala)
    processStream.getSideOutput(new OutputTag[Double]("high"))
      .print()
    env.execute("out test")
  }


  }
class myProcess extends ProcessFunction[SensorReading,SensorReading] {
  lazy val tag: OutputTag[Double] = new OutputTag[Double]("high")
  override def processElement(value: SensorReading,
                              ctx: ProcessFunction[SensorReading, SensorReading]#Context,
                              out: Collector[SensorReading]): Unit = {
  //旁路
  if(value.tempature>35) {
    //output(org.apache.flink.util)
    ctx.output(tag, value.tempature)
  }
  //主路
  out.collect(value)


}
}

版权声明:本文为u014657468原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。