使用Flink实现kafka流关联hive中维表

本文可以用于flink相关内容的入门练习对照。欢迎各路高手批评指导!

flink版本:1.10.1

kafka版本:0.10.0

hive版本:1.2.1

依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
</dependency>

代码:

package joinhive

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row

//kafka数据样例类
case class UserInfo(userid: String, ulac: String, usell: String)

object KafkaCombineHive {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val paramTool: ParameterTool = ParameterTool.fromArgs(args)

    val host: String = paramTool.get("host")
    val port: Int = paramTool.getInt("port")
    val group: String = paramTool.get("group")

    val properties = new Properties()
    properties.setProperty("group.id", group)
    properties.setProperty("bootstrap.servers", host + ":" + port)

    val simpleStringSchema = new SimpleStringSchema()
    val kafka: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String]("f_input01", simpleStringSchema, properties)

    kafka.setStartFromLatest()
    kafka.setCommitOffsetsOnCheckpoints(true)
    val KafkaInStream: DataStream[String] = env.addSource(kafka)
    //增加对kafka的转换 其内部格式为样例类
    val kafkaUserStream: DataStream[UserInfo] = KafkaInStream.map(a => {
      val tem = a.split(",")
      UserInfo(tem(0), tem(1), tem(2))
    })
    // 尝试创建kafka的表
    val blinkStreamSettings_k: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val blinkStreamTableEnv_k: StreamTableEnvironment = StreamTableEnvironment.create(env, blinkStreamSettings_k)
    val kafka_tab: Table = blinkStreamTableEnv_k.fromDataStream(kafkaUserStream)

    blinkStreamTableEnv_k.createTemporaryView("ktable", kafka_tab)
    //    val sql2 = "select * from ktable where userid = 's101'"
    val sql2 = "select * from ktable"
    val result_k: Table = blinkStreamTableEnv_k.sqlQuery(sql2)

    /**
     * 下方为hive处理部分
     */

    //创建hive-site.xml路径 ,当前是win环境下把hive-site.xml 放在D:\dp_maintenance
    val hiceConfdir = "D:\\dp_maintenance"
    //hive 版本为1.2.1
    val hiveVersion = "1.2.1"
    //创建HiveCatalog
    val hive: HiveCatalog = new HiveCatalog("hive", "default", hiceConfdir, hiveVersion)
    //    hive.getHiveConf
    //    注册hive catalog
    blinkStreamTableEnv_k.registerCatalog("hive", hive)
    //使用指定hive catalog
    blinkStreamTableEnv_k.useCatalog("hive")
    //sql里ods22是指hive的database lacci是该库下的一个维表,即提供数据和kafka流做关联的hive表
    val sql_select1 = "select * from  ods22.lacci".stripMargin
    val resultselect: Table = blinkStreamTableEnv_k.sqlQuery(sql_select1)
    //    blinkStreamTableEnv_k.createTemporaryView("hive1", resultselect)
    //    val sql3 = "select * from hive1 "

    result_k.join(resultselect, "id = userid").toAppendStream[Row].print()

    env.execute("kafka join hive")

  }
}

hive中维表样例:

输入:

输出:

补充说明:

这是一个最简单的关联场景。没有往kafka或者hdfs里sink;

当前使用的join并不是“left join”,所以流数据如果按照条件没能在维表里关联到数据,则该条流数据也不在最后的table里;

网上很多方法可以“动态”加载更新后的hive维表,例如flink1.11版本之后TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED=true。


版权声明:本文为ImStarBoy原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。