FlinkStreamETL
0.功能说明
概括:利用Flink实时统计Mysql数据库BinLog日志数据,并将流式数据注册为流表,利用Flink SQL将流表与Mysql的维表进行JOIN,最后将计算结果实时写入Greenplum/Mysql。
1.需求分析
1.1需求
实时统计各个地区会议室的空置率,预定率,并在前端看板上实时展示。源系统的数据库是Mysql,它有三张表,分别是:t_meeting_info(会议室预定信息表)、t_meeting_location(属地表,维度表)、t_meeting_address(会议室属地表,维度表)。
1.2说明
t_meeting_info表中的数据每时每刻都在更新数据,若通过JDBC方式定时查询Mysql,会给源系统数据库造成大量无形的压力,甚至会影响正常业务的使用,并且时效性也不高。需要在基本不影响Mysql正常使用的情况下完成对增量数据的处理。
上面三张表的DDL语句如下:
- t_meeting_info(会议室预定信息表,这张表数据会实时更新)
CREATE TABLE `t_meeting_info` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`meeting_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '会议业务唯一编号',
`msite` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议名称',
`mcontent` varchar(4096) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议内容',
`attend_count` int(5) DEFAULT NULL COMMENT '参会人数',
`type` int(5) DEFAULT NULL COMMENT '会议类型 1 普通会议 2 融合会议 3 视频会议 4 电话会议',
`status` int(255) DEFAULT NULL COMMENT '会议状态 ',
`address_id` int(11) DEFAULT NULL COMMENT '会议室id',
`email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人邮箱',
`contact_tel` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '联系电话',
`create_user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人姓名',
`create_user_id` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人工号',
`creator_org` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人组织',
`mstart_date` datetime DEFAULT NULL COMMENT '会议开始时间',
`mend_date` datetime DEFAULT NULL COMMENT '会议结束时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
`company` int(10) DEFAULT NULL COMMENT '会议所在属地code',
`sign_status` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '预留字段',
PRIMARY KEY (`id`) USING BTREE,
KEY `t_meeting_info_meeting_code_index` (`meeting_code`) USING BTREE,
KEY `t_meeting_info_address_id_index` (`address_id`) USING BTREE,
KEY `t_meeting_info_create_user_id_index` (`create_user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=65216 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会议主表';
t_meeting_location(属地表,地区维表)
CREATE TABLE `t_meeting_location` ( `id` int(11) NOT NULL AUTO_INCREMENT, `short_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地简称', `full_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地全称', `code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '属地code', `region_id` int(11) DEFAULT NULL COMMENT '地区id', `create_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人', `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `t_meeting_location_code_uindex` (`code`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='属地表';t_meeting_address(会议室属地表,会议室维表)
CREATE TABLE `t_meeting_address` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id', `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室名称', `location` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '所在属地', `shared` int(3) DEFAULT NULL COMMENT '是否共享 0 默认不共享 1 全部共享 2 选择性共享', `cost` int(10) DEFAULT NULL COMMENT '每小时成本', `size` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室容量大小', `bvm_ip` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT 'BVM IP', `type` int(2) DEFAULT NULL COMMENT '会议室类型 1 普通会议室 2 视频会议室', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `create_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人', `update_time` datetime DEFAULT NULL COMMENT '更新时间', `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人', `status` int(2) DEFAULT NULL COMMENT '是否启用 ,0 未启用 1已启用 2已删除', `order` int(5) DEFAULT NULL COMMENT '排序', `approve` int(2) DEFAULT NULL COMMENT '是否审批 0 不审批 1 审批', PRIMARY KEY (`id`) USING BTREE, KEY `t_meeting_address_location_index` (`location`) USING BTREE, KEY `order` (`order`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=554 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会议室表';
2.实现方案
方案如下图所示:
- 利用Canal监听
Mysql数据库的增量BinLog日志数据(JSON格式) - 将增量日志数据作为Kafka的生产者,Flink解析Kafka的
Topic中的数据并消费 - 将计算后的流式数据(Stream)注册为Flink 中的表(Table)
- 最后利用Flink与t_meeting_location、t_meeting_address维表进行JOIN,将最终的结果写入数据库。

需要服务器:CentOS7,JDK8、Scala 2.12.6、Mysql、Canal、Flink1.9、Zookkeeper、Kafka
3.可视化方案
- Tableau实时刷新Greenplum,FineBI也可以(秒级)
- DataV也可以每几秒刷新一次
- Flink计算后的结果,写入到缓存,前端开发可视化组件进行展示(实时展示)。
4.项目地址
由于CSDN不方便粘贴图片,详细内容请见:
FlinkStreamETL
https://github.com/liwei199411/FlinkStreamETL/tree/master
5.参考目录
[1].基于Spark Streaming + Canal + Kafka对Mysql增量数据实时进行监测分析
[2].Canal
[3].Canal 的 .NET 客户端
[4].如何基于MYSQL做实时计算?
[5].基于Canal与Flink实现数据实时增量同步(一)
[8].Flink继续实践:从日志清洗到实时统计内容PV等多个指标
[9].实时数据架构体系建设思路
[10].Flink` 流与维表的关联
[11].Flink DataStream流表与维表Join(Async` I/O)
12. `flink 流表join mysql表
作者:岳过山丘
链接:https://www.jianshu.com/p/44583b98ecbb
13. `flink1.9 使用LookupableTableSource实现异步维表关联
作者:todd5167
链接:https://www.jianshu.com/p/7ebe1ec8aa7c
14. Flink异步之矛盾-锋利的Async I/O
作者:王知无
链接:https://www.jianshu.com/p/85ee258aa41f
15.Flink 的时间属性及原理解析
https://blog.csdn.net/zhengzhaoyang122/article/details/107352934?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-3
16.大屏数据可视化
https://yyhsong.github.io/iDataV/xiang.top/2020/03/05/基于Canal与Flink实现数据实时增量同步-一/)