Canal-从零开始,一步一步教你,包教包会

简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

从0开始配置Canal

1.首先查看mysql是否开启了bin_log日志:

SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format%'; 

 

如果Value显示为OFF,则修改mysql的配置文件:my.ini/my.cnf

在末未追加:

server_id=1       #mysql实例id,不能和canal的salveId重复
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式

之后充气mysql,再查看:SHOW VARIABLES LIKE 'log_bin'; Value为ON的时候,表示bin_log日志开启了

2.在mysql里面添加以下的相关用户和权限

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3.下载安装Canal服务

  • 解压文件,修改conf/example下的instance.properties配置文件

# 改成自己需要关联数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 改成自己刚刚创建数据的canal的用户名和密码
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配的正则,关联所有,监听某个库的某个表,用逗号分隔可监听多个:database1.tableName1,database2.tableName2
canal.instance.filter.regex=.*\\..*
  • 进入bin目录下启动

    windows点击:startup.bat

    linux和mac:startup.sh

  • 查看日志:logs/canal/canal.log最后显示

logs/example/example.log

 

不报错表示启动成功

4.新建IDEA项目

  • pom

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
  • 新建CanalClient

@Component
public class CanalClient implements DisposableBean {
    private CanalConnector canalConnector;
​
    @Bean
    public CanalConnector getCanalConnector() {
        canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "canal", "canal");
        canalConnector.connect();
        // 指定filter,指定{database}.{table},canalConnector.subscribe("database.table,database2.table2");
        canalConnector.subscribe(".*\\..*");
        // 回滚寻找上次中断的位置
        canalConnector.rollback();
        return canalConnector;
    }
​
    @Override
    public void destroy() throws Exception {
        if (canalConnector != null) {
            canalConnector.disconnect();
        }
    }
}

新建业务逻辑处理类:CanalScheduling

@Component
public class CanalScheduling implements Runnable {
    @Resource
    private CanalConnector canalConnector;
    @Autowired
    private VideoDao videoDao;
​
    @Override
    @Scheduled(fixedDelay = 100)
    public void run() {
        long batchId = -1;
        try {
            int batchSize = 1000;
            Message message = canalConnector.getWithoutAck(batchSize);
            batchId = message.getId();
            List<CanalEntry.Entry> entries = message.getEntries();
            if (batchId != -1 && entries.size() > 0) {
                entries.forEach(entry -> {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        // 解析处理
                        publishCanalEvent(entry);
                    }
                });
            }
            canalConnector.ack(batchId);
        } catch (Exception e) {
            e.printStackTrace();
            canalConnector.rollback(batchId);
        }
    }
​
    /**
     * 处理解析
     *
     * @param entry
     */
    private void publishCanalEvent(CanalEntry.Entry entry) {
        CanalEntry.EventType entryType = entry.getHeader().getEventType();
        String database = entry.getHeader().getSchemaName();
        String table = entry.getHeader().getTableName();
        CanalEntry.RowChange change = null;
        try {
            change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
​
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            return;
        }
        change.getRowDatasList().forEach(rowData -> {
            List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
            String primaryKey = "id";
            CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
                    && primaryKey.equals(column.getName())).findFirst().orElse(null);
​
            Map<String, Object> dataMap = parseColumnsToMap(columns);
            importToVod(dataMap, database, table);
        });
    }
​
    /**
     * 导入更新数据的逻辑,可以导入oracle,elasticsearch,redis,MongoDB等
     * 这边我同步到另一个mysql的数据库
     *
     * @param map
     * @param database
     * @param table
     */
    private void importToVod(Map<String, Object> map, String database, String table) {
        System.out.println(map + database + table);
        if (StringUtils.equals("test", database)) {
            if (StringUtils.equals("vod_videos", table)) {
                // 更新的是vod_videos
                String videoId = map.get("video_id").toString();
                String title = map.get("title").toString();
                videoDao.updateVideo(videoId, title);
            }
        }
​
    }
​
    /**
     * 将columns转map
     *
     * @param columns
     * @return
     */
    Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
        Map<String, Object> jsonMap = Maps.newHashMap();
        columns.forEach(column -> {
            if (column == null) {
                return;
            }
            jsonMap.put(column.getName(), column.getValue());
        });
        return jsonMap;
    }
}

在启动类加上开启定时任务:@EnableScheduling

Dao类:VideoDao

@Mapper
public interface VideoDao {
    /**
     * 更新视频标题
     *
     * @param videoId
     * @param title
     */
    @Update("UPDATE vod_videos SET title = #{title} WHERE video_id = #{videoId}")
    void updateVideo(@Param("videoId") String videoId, @Param("title") String title);
}

启动程序,修改数据库title字段,加上66666:

会发现收到修改信息,控制台打印日志:

再看目标库,数据也对应的修改了:


版权声明:本文为hualingkai11原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。