Canal 与 mysql 实现消息同步(采用的是docker部署)

1. 遇到的问题

 canal 1.1.6 与MYSQL 8 的版本的整合  我是没有成功 ,在网上也查了很多的资料 ,没有成功,可能是mysql 8 与canal v1.1.6 的兼容问题吧。
如果你也是采用的是mysql 8 与canal v1.1.6 ,你如果成功了可以 私信我 ,如果你也找了很久 也没有成功的化,就改用mysql 5.7 和canal v:1.1.4 版本吧

2. mysql 5.7 整合canal v:1.1.4 

这里是采用docker  拉取好镜像文件

 1. 安装 mysql 5.7 镜像与 canal:v1.1.4 镜像

docker pull mysql:5.7

docker pull docker.io/canal/canal-server:v1.1.4

2. 运行相应的容器
 

运行数据库 镜像  由于mysql 5采用的密码验证方式与canal v1.1.4的 验证方式相同不需要进行更改

docker run \
-p 3306:3306 \
--name mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql

运行canal容器

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server:v1.1.4

3. 进入容器,更改mysql的相关的配置文件

# 进入数据库容器
docker exec -it mysql /bin/bash

#登录数据库

mysql -uroot -p123456

# 创建canal 用户 设置秘密为 canal 并赋予 相应的权限  这里的账号秘密 与instance.properties对应

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

# 进入数据库,查看密码的加密方式

use mysql;

select user ,host ,plugin from user;

exit;


# 进入etc文件夹下面 查找配置文件

cd /etc





 

 

# 退出容器

exit;
# 通过docker 中的cp 命令将 容器中的配置文件复制到本地 进行更改 ,更改之后在复制回去

docker cp mysql:/etc/my.cnf  /home/demo


cd /home/demo

#这是会出现my.cnf文件 

vi my.cnf

#添加上这三句 开启数据库的binlog 日志模式

log-bin=mysql-bin
binlog-format=ROW
server_id=1
 
docker cp /home/demo/my.cnf  mysql:/etc/my.cnf
#重启数据库
docker restart mysql
# 进入到数据库,查看 是否开启binlog模式

show variables like 'log_bin';

更改配置文件  开启binlog 模式

 

 4. 进入canal 更改canal的相关配置

 docker exec -it canal /bin/bash

 cd canal-server/conf/

 vi canal.properties

# 添加上
canal.id=1001 

cd example/
#修改 instance.properties中的配置连接数据库的地址

vi instance.properties

 

 

 

# 然后退出容器
# 重启容器
docker restart mysql;
docker restart canal;
# 进入到canal容器中查看日志文件 ,这个是看canal是否与mysql连接成功的.
cd canal-server/logs/example/

cat example.log
# 连接成功

 

下面是 springcloud监听中的代码 可以去搜 黑马的畅购商城去找 

注意将下面的ip 更改为自己的ip

 

package com.changgou.canal.listener;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;
@CanalEventListener
public class CanalDataEventListener {

    /***
     * 增加数据监听
     * @param eventType
     * @param rowData
     */
    @InsertListenPoint
    public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }

    /***
     * 修改数据监听
     * @param rowData
     */
    @UpdateListenPoint
    public void onEventUpdate(CanalEntry.RowData rowData) {
        System.out.println("UpdateListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }

    /***
     * 删除数据监听
     * @param eventType
     */
    @DeleteListenPoint
    public void onEventDelete(CanalEntry.EventType eventType) {
        System.out.println("DeleteListenPoint");
    }

    /***
     * 自定义数据修改监听
     * @param eventType
     * @param rowData
     */
    @ListenPoint(destination = "example", schema = "changgou_content", table = {"tb_content_category", "tb_content"}, eventType = CanalEntry.EventType.UPDATE)
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.err.println("DeleteListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }
}
package com.changgou.canal;

import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
@EnableEurekaClient
@EnableCanalClient
public class CanalApplication {
    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }
}
server:
  port: 18083
spring:
  application:
    name: canal
  redis:
    host: 192.168.200.130
    port: 6379
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
          #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE
#canal配置
canal:
  client:
    instances:
      # exmaple
      example:
        host: 192.168.200.130
        port: 11111

监听到的结果

 5 下面是我整合 mysql 8 与canal :v1.1.6 遇到的问题

  1. mysql8 与 canal:v1.1.6 密码验证方式不同 造成的 认证错误

更改 数据库中密码的验证方式就行了. 设置对应的权限

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

ALTER USER 'root'@'%' IDENTIFIED BY '123456' PASSWORD EXPIRE NEVER;
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456'; 

ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; 

2.  查看日志文件的时候读取 不了 .. ..表 或者 ...表不存在 rrorNumber=1146, fieldCount=-1, message=Table 'mysql.base table' doesn't exist, sqlState=42S02, sqlStateMarker=#]  我搜索到得到解决办法是:

进入到instance.properties文件中  修改这个为false 还有 下面监听的表 以及对黑名单的操作

然后就会出现第三个错误

 

 

 

 

 更改监听的库表 和忽略监听的库表

3. 然后就是这个错误 日志上面一直不停的报这个错误. Unknow form  日志文件

 这个问题一致没有解决  不知道如何解决的  可能是 数据库和canal的兼容问题


版权声明:本文为m0_57780081原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。