canal-adapter 同步mysql到es [基于tcp模式]

canal环境搭建

  1. canal官网下载 https://github.com/alibaba/canal/tags
  2. 分别将三个tar.gz包解压到指定的包下(adapter|admin|deployer)

canal-server

  1. 将自己伪装成mysql的slave节点,来订阅mysql binlog的变更
  2. 配置mysql,开启binlog
log-bin=mysql-bin # 开启 
binlog binlog-format=ROW # 选择 ROW 模式 
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  1. 配置canal.properties
canal.id = 101
canal.register.ip = 127.0.0.1 
canal.admin.user = admin 
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# 当前server上部署的instance列表,对应conf目录下创建对应的文件夹,文件copy example中做修改即可
canal.destinations = example,testusers  
  1. 配置example.properties
canal.instance.mysql.slaveId=103  # 不能和mysql的server_id重复
canal.instance.master.address=mysql地址:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=xxxxxx
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=canal_manager\..*  # 配置包含数据库和表
canal.instance.filter.black.regex=mysql\\.slave_.*   # 配置不包含数据库和表
canal.mq.topic=xxx
  1. 配置过滤正则表达式说明
    在这里插入图片描述

  2. 启动canal

/bin/startup.sh
查看canal.log,cat logs/canal/canal.log
查看example.log,cat logs/example/example.log

canal-admin

  1. 提供了WebUI操作界面用来配置集群、节点、实例
  2. 初始化数据库:conf/canal_manager.sql
  3. 配置application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: 123456
  1. 启动
bin/startup.sh
查看日志:cat logs/admin.log
  1. WebUI
浏览器访问:http://localhost:8089/ 用户名/密码:admin/123456

canal-adapter

  1. 可以将数据库变更同步给MQ、ES、DB、LOGGER,需要我们根据实际业务需要去修改配置文件conf/application.yml
  2. 创建表
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users` (
  `id` varchar(64) NOT NULL,
  `username` varchar(20) NOT NULL COMMENT '用户名',
  `phone` varchar(64) NOT NULL COMMENT '手机号',
  `nickname` varchar(20) NOT NULL COMMENT '昵称',
  `last_modified` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `id` (`id`),
  UNIQUE KEY `username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

BEGIN;
INSERT INTO `users` VALUES ('1', '郑强', '13181838112', '爸爸', '2022-01-24 17:13:00');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
  1. 创建索引
PUT /mytest_user/
{
  "mappings": {
    "_doc":{
      "properties": {
        "id": {
          "type": "keyword"
        },
        "username": {
          "type": "text"
        },
        "phone": {
          "type": "text"
        },
        "nickname": {
          "type": "text"
        },
        "last_modified":{
          "type": "date"
        }
      }
    }
  }
}
  1. 修改canal-adapter配置
server:
  port: 8088
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://172.22.77.76:3306/canal_manager?useUnicode=true
      username: root
      password: rc2021!
  canalAdapters:
  - instance: testusers # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: es-01
  1. 修改conf/es7/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: testusers # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user # 索引名称
  _type: _doc
  _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配, **_id要与下面sql语句字段 as _id一样**
  upsert: true
  #pk: id
  sql: "SELECT id as _id,username as username,phone as phone,nickname as nickname,last_modified as last_modified FROM users"
#  objFields:
#  #    _labels: array:;
#  # etlCondition: "where a.c_time>={}"
#    commitBatch: 3000 # 提交批大小
  1. 启动
bin/startup.sh
  1. 全量同步
adapter根目录下创建目录 mkdir sh
vim users_all.sh   curl http://localhost:8088/etl/es7/mytest_user.yml -X POST
chmod vim users_all.sh 777
./vim users_all.sh
{"succeeded":true,"resultMessage":"导入ES 数据:4 条"}
  1. 增量同步
    启动服务之后就已经增量同步,打开日志即可看到tail -f logs/adapter/adapter.log
    在这里插入图片描述

  2. 解决问题
    (1) druid类型转换错误
    在这里插入图片描述\

   1> 下载源码包:[https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz](https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz)
   2> 定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为

   	<dependency>
   	    <groupId>com.alibaba</groupId>
   	    <artifactId>druid</artifactId>
   	    <scope>provided</scope>
   	</dependency>
   	
   3> mvn clean package
   4> 到canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的
   5> 重启adapter
   


版权声明:本文为CMLXZL原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。