[Scala] Flink项目小彩蛋(六)

传送区

[Scala] Flink项目实例系列(零)
[Scala] Flink项目实时热门商品统计(一)
[Scala] Flink项目实时流量统计(二)
[Scala] Flink项目恶意登录监控(三)
[Scala] Flink项目订单支付失效监控(四)
[Scala] Flink项目订单支付实时对账(五)
[Scala] Flink项目小彩蛋(六)
本项目的代码及文件见这这这,友情码是:3n9z。

Join

Join官方传送

戳我

Tumbling Window Join

图好话少

滚动

Sliding Window Join

滑动

Session Window Join

会话

Interval Join

间隙

示例代码

数据源结构

orderIdeventTypetxIdtimestamp
34729paysd76f87d61558430844
txIdpayChanneleventTime
ewr342as4wechat1558430845
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object TxMatchByJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 读取订单事件流
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream = env.readTextFile(resource.getPath)
      //    val orderEventStream = env.socketTextStream("localhost", 7777)
      .map(data => {
      val dataArray = data.split(",")
      OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
    })
      .filter(_.txId != "")
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.txId)

    // 读取支付到账事件流
    val receiptResource = getClass.getResource("/ReceiptLog.csv")
        val receiptEventStream = env.readTextFile(receiptResource.getPath)
//    val receiptEventStream = env.socketTextStream("localhost", 8888)
      .map(data => {
        val dataArray = data.split(",")
        ReceiptEvent(dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong)
      })
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.txId)

    // intervalJoin的概念见链接
    // https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html#interval-join
    val processedStream = orderEventStream.intervalJoin(receiptEventStream)
      .between(Time.seconds(-5), Time.seconds(5))
      .process(new TxPayMatchByJoin())

    processedStream.print()

    env.execute("tx pay match by join job")
  }
}

class TxPayMatchByJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {
  override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent,
    ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    out.collect((left, right))
  }
}

版权声明:本文为chike0039原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。