阿里开源项目——Canal

阿里开源项目——Canal

1.Canal是什么?

canal是阿里巴巴旗下的一款开源项目,使用纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

2.Canal工作原理

canal原理图
原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

3.Mysql开启binlog模式

canal是基于mysql的主从模式实现的,所以必须先开启binlog模式.

(1) 修改/etc/my.cnf 需要开启主 从模式,开启binlog模式。

执行如下命令,编辑mysql配置文件
在这里插入图片描述
命令行如下:

docker exec -it mysql /bin/bash
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf

修改mysqld.cnf配置文件,添加如下配置:
在这里插入图片描述
上图配置如下:

log-bin=/var/lib/mysql/mysql-bin
server-id=12345

(2) 创建账号 用于测试使用,

使用root账号创建用户并授予权限

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

(3)重启mysql容器

docker restart mysql

4.spring boot项目如何使用Canal?

(1)Canal的starter放入maven库

Canal是阿里的产品,中央仓库是没有的,所以需要手动放入自己的maven仓库。
下面 是从阿里下载的statrer:
在这里插入图片描述
地址栏输入cmd进入命令行模式。
输入“mvn install”命令进行安装。安装路径根据maven安装路径下的cong下的setting.xml配置地址下载。
在这里插入图片描述

(2)引入pom
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
(3)配置文件配置
#canal配置
canal:
  client:
    instances:
      example:
        host: 192.168.200.128
        port: 11111
(4)监听创建

创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:

package com.example.canatest.config;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;

/**
 * @author gzy
 * @date 2020/6/7
 */
@CanalEventListener
public class MyEventListener {
  //监听添加操作
    @InsertListenPoint
    public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }
    
  //监听修改操作
    @UpdateListenPoint
    public void onEvent1(CanalEntry.RowData rowData) {
        System.err.println("UpdateListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }
    
  //监听删除操作
    @DeleteListenPoint
    public void onEvent3(CanalEntry.EventType eventType) {
        System.err.println("DeleteListenPoint");
    }
    
   //自定义监听
  // canal名称,数据库名,表名,监听的操作
    @ListenPoint(destination = "example", schema = "canal-test", table = {"t_user", "test_table"}, eventType = CanalEntry.EventType.UPDATE)
    public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.err.println("DeleteListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }
}
(5)启动类创建

创建启动类,代码如下:

/**
 * 引导类  启动类
 */
@SpringBootApplication
@EnableCanalClient
public class CanaTestApplication {

	public static void main(String[] args) {
		SpringApplication.run(CanaTestApplication.class, args);
	}
}
(6)测试

启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出如下:
在这里插入图片描述


版权声明:本文为weixin_45362084原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。