Flink实时读取Mysql增量日志数据并写入GreenPlum/Mysql

FlinkStreamETL

0.功能说明

概括:利用Flink实时统计Mysql数据库BinLog日志数据,并将流式数据注册为流表,利用Flink SQL将流表与Mysql的维表进行JOIN,最后将计算结果实时写入Greenplum/Mysql。

1.需求分析

1.1需求

实时统计各个地区会议室的空置率,预定率,并在前端看板上实时展示。源系统的数据库是Mysql,它有三张表,分别是:t_meeting_info(会议室预定信息表)、t_meeting_location(属地表,维度表)、t_meeting_address(会议室属地表,维度表)。

1.2说明

t_meeting_info表中的数据每时每刻都在更新数据,若通过JDBC方式定时查询Mysql,会给源系统数据库造成大量无形的压力,甚至会影响正常业务的使用,并且时效性也不高。需要在基本不影响Mysql正常使用的情况下完成对增量数据的处理。

上面三张表的DDL语句如下:

  • t_meeting_info(会议室预定信息表,这张表数据会实时更新)
CREATE TABLE `t_meeting_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `meeting_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '会议业务唯一编号',
  `msite` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议名称',
  `mcontent` varchar(4096) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议内容',
  `attend_count` int(5) DEFAULT NULL COMMENT '参会人数',
  `type` int(5) DEFAULT NULL COMMENT '会议类型 1 普通会议 2 融合会议 3 视频会议 4 电话会议',
  `status` int(255) DEFAULT NULL COMMENT '会议状态 ',
  `address_id` int(11) DEFAULT NULL COMMENT '会议室id',
  `email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人邮箱',
  `contact_tel` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '联系电话',
  `create_user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人姓名',
  `create_user_id` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人工号',
  `creator_org` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人组织',
  `mstart_date` datetime DEFAULT NULL COMMENT '会议开始时间',
  `mend_date` datetime DEFAULT NULL COMMENT '会议结束时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  `company` int(10) DEFAULT NULL COMMENT '会议所在属地code',
  `sign_status` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '预留字段',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `t_meeting_info_meeting_code_index` (`meeting_code`) USING BTREE,
  KEY `t_meeting_info_address_id_index` (`address_id`) USING BTREE,
  KEY `t_meeting_info_create_user_id_index` (`create_user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=65216 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会议主表';
  • t_meeting_location(属地表,地区维表)

    CREATE TABLE `t_meeting_location` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `short_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地简称',
      `full_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地全称',
      `code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '属地code',
      `region_id` int(11) DEFAULT NULL COMMENT '地区id',
      `create_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人',
      `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      PRIMARY KEY (`id`) USING BTREE,
      UNIQUE KEY `t_meeting_location_code_uindex` (`code`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='属地表';
    
  • t_meeting_address(会议室属地表,会议室维表)

    CREATE TABLE `t_meeting_address` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
      `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室名称',
      `location` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '所在属地',
      `shared` int(3) DEFAULT NULL COMMENT '是否共享 0 默认不共享 1 全部共享 2 选择性共享',
      `cost` int(10) DEFAULT NULL COMMENT '每小时成本',
      `size` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室容量大小',
      `bvm_ip` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT 'BVM IP',
      `type` int(2) DEFAULT NULL COMMENT '会议室类型 1 普通会议室  2 视频会议室',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `create_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
      `status` int(2) DEFAULT NULL COMMENT '是否启用 ,0 未启用 1已启用 2已删除',
      `order` int(5) DEFAULT NULL COMMENT '排序',
      `approve` int(2) DEFAULT NULL COMMENT '是否审批 0 不审批 1 审批',
      PRIMARY KEY (`id`) USING BTREE,
      KEY `t_meeting_address_location_index` (`location`) USING BTREE,
      KEY `order` (`order`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=554 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会议室表';
    

2.实现方案

方案如下图所示:

  • 利用Canal监听Mysql数据库的增量BinLog日志数据(JSON格式
  • 将增量日志数据作为Kafka的生产者,Flink解析KafkaTopic 中的数据并消费
  • 将计算后的流式数据(Stream)注册为Flink 中的表(Table)
  • 最后利用Flink与t_meeting_location、t_meeting_address维表进行JOIN,将最终的结果写入数据库。
    在这里插入图片描述

需要服务器:CentOS7,JDK8、Scala 2.12.6、Mysql、Canal、Flink1.9、Zookkeeper、Kafka

3.可视化方案

  • Tableau实时刷新Greenplum,FineBI也可以(秒级)
  • DataV也可以每几秒刷新一次
  • Flink计算后的结果,写入到缓存,前端开发可视化组件进行展示(实时展示)。

4.项目地址

由于CSDN不方便粘贴图片,详细内容请见:
FlinkStreamETL

https://github.com/liwei199411/FlinkStreamETL/tree/master

5.参考目录

[1].基于Spark Streaming + Canal + Kafka对Mysql增量数据实时进行监测分析

[2].Canal

[3].Canal 的 .NET 客户端

[4].如何基于MYSQL做实时计算?

[5].基于Canal与Flink实现数据实时增量同步(一)

[6].美团DB数据同步到数据仓库的架构与实践

[7].处理JSON格式的日志数据,然后进行流式Join

[8].Flink继续实践:从日志清洗到实时统计内容PV等多个指标

[9].实时数据架构体系建设思路

[10].Flink` 流与维表的关联

[11].Flink DataStream流表与维表Join(Async` I/O)

12. `flink 流表join mysql表

作者:岳过山丘
链接:https://www.jianshu.com/p/44583b98ecbb

13. `flink1.9 使用LookupableTableSource实现异步维表关联

作者:todd5167
链接:https://www.jianshu.com/p/7ebe1ec8aa7c

14. Flink异步之矛盾-锋利的Async I/O

作者:王知无
链接:https://www.jianshu.com/p/85ee258aa41f

15.Flink 的时间属性及原理解析

https://blog.csdn.net/zhengzhaoyang122/article/details/107352934?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-3

16.大屏数据可视化
https://yyhsong.github.io/iDataV/xiang.top/2020/03/05/基于Canal与Flink实现数据实时增量同步-一/)


版权声明:本文为leaeason原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。