阿里开源项目——Canal
1.Canal是什么?
canal是阿里巴巴旗下的一款开源项目,使用纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
2.Canal工作原理

原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
3.Mysql开启binlog模式
canal是基于mysql的主从模式实现的,所以必须先开启binlog模式.
(1) 修改/etc/my.cnf 需要开启主 从模式,开启binlog模式。
执行如下命令,编辑mysql配置文件
命令行如下:
docker exec -it mysql /bin/bash
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf
修改mysqld.cnf配置文件,添加如下配置:
上图配置如下:
log-bin=/var/lib/mysql/mysql-bin
server-id=12345
(2) 创建账号 用于测试使用,
使用root账号创建用户并授予权限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
(3)重启mysql容器
docker restart mysql
4.spring boot项目如何使用Canal?
(1)Canal的starter放入maven库
Canal是阿里的产品,中央仓库是没有的,所以需要手动放入自己的maven仓库。
下面 是从阿里下载的statrer:
地址栏输入cmd进入命令行模式。
输入“mvn install”命令进行安装。安装路径根据maven安装路径下的cong下的setting.xml配置地址下载。
(2)引入pom
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
(3)配置文件配置
#canal配置
canal:
client:
instances:
example:
host: 192.168.200.128
port: 11111
(4)监听创建
创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:
package com.example.canatest.config;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;
/**
* @author gzy
* @date 2020/6/7
*/
@CanalEventListener
public class MyEventListener {
//监听添加操作
@InsertListenPoint
public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
//监听修改操作
@UpdateListenPoint
public void onEvent1(CanalEntry.RowData rowData) {
System.err.println("UpdateListenPoint");
rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
//监听删除操作
@DeleteListenPoint
public void onEvent3(CanalEntry.EventType eventType) {
System.err.println("DeleteListenPoint");
}
//自定义监听
// canal名称,数据库名,表名,监听的操作
@ListenPoint(destination = "example", schema = "canal-test", table = {"t_user", "test_table"}, eventType = CanalEntry.EventType.UPDATE)
public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("DeleteListenPoint");
rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
}
(5)启动类创建
创建启动类,代码如下:
/**
* 引导类 启动类
*/
@SpringBootApplication
@EnableCanalClient
public class CanaTestApplication {
public static void main(String[] args) {
SpringApplication.run(CanaTestApplication.class, args);
}
}
(6)测试
启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出如下:
版权声明:本文为weixin_45362084原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。