需求:如果温度值小于32F,就将报警信息输出到侧输出流中
package com.run.wc
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
* @author 霄嵩
*/
/**
* SideOutput侧输出流
* 需求:如果温度值小于32F,就将报警信息输出到侧输出流中
*/
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.socketTextStream("master", 9999)
val mapStream = dataStream.map(x => {
val line = x.split(",")
Sensor(line(0).trim.toString, line(1).trim.toLong, line(2).trim.toDouble)
})
val stream = mapStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Sensor](Time.seconds(1)) {
override def extractTimestamp(element: Sensor): Long = {
element.timestamp * 1000
}
})
val processStream = stream.process(new SideOutputAlert())
mapStream.print("map data")
processStream.print("process data")
//通过getSideOutput获取侧输出流,并打印输出
processStream.getSideOutput(new OutputTag[String]("freezingAlert")).print("side output data")
env.execute("side output test")
}
}
//温度值小于32F,就将报警信息输出到侧输出流中
class SideOutputAlert() extends ProcessFunction[Sensor, Sensor] {
//定义一个侧输出流标签
lazy val freezingAlert: OutputTag[String] = new OutputTag[String]("freezingAlert")
override def processElement(value: Sensor,
ctx: ProcessFunction[Sensor, Sensor]#Context,
out: Collector[Sensor]): Unit = {
if (value.temperature < 32) {
ctx.output(freezingAlert, s"${value.id}的温度值小于32F")
} else {
out.collect(value)
}
}
}
Result:
process data> Sensor(sensor_1,1547718207,37.0)
map data> Sensor(sensor_1,1547718207,37.0)
process data> Sensor(sensor_1,1547718207,36.0)
map data> Sensor(sensor_1,1547718207,36.0)
side output data> sensor_1的温度值小于32F
map data> Sensor(sensor_1,1547718207,31.0)
process data> Sensor(sensor_1,1547718207,36.0)
map data> Sensor(sensor_1,1547718207,36.0)
side output data> sensor_1的温度值小于32F
map data> Sensor(sensor_1,1547718207,20.0)
版权声明:本文为accptanggang原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。