canal环境搭建
- canal官网下载 https://github.com/alibaba/canal/tags
- 分别将三个tar.gz包解压到指定的包下(adapter|admin|deployer)
canal-server
- 将自己伪装成mysql的slave节点,来订阅mysql binlog的变更
- 配置mysql,开启binlog
log-bin=mysql-bin # 开启
binlog binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 配置canal.properties
canal.id = 101
canal.register.ip = 127.0.0.1
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# 当前server上部署的instance列表,对应conf目录下创建对应的文件夹,文件copy example中做修改即可
canal.destinations = example,testusers
- 配置example.properties
canal.instance.mysql.slaveId=103 # 不能和mysql的server_id重复
canal.instance.master.address=mysql地址:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=xxxxxx
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=canal_manager\..* # 配置包含数据库和表
canal.instance.filter.black.regex=mysql\\.slave_.* # 配置不包含数据库和表
canal.mq.topic=xxx
配置过滤正则表达式说明

启动canal
/bin/startup.sh
查看canal.log,cat logs/canal/canal.log
查看example.log,cat logs/example/example.log
canal-admin
- 提供了WebUI操作界面用来配置集群、节点、实例
- 初始化数据库:conf/canal_manager.sql
- 配置application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
- 启动
bin/startup.sh
查看日志:cat logs/admin.log
- WebUI
浏览器访问:http://localhost:8089/ 用户名/密码:admin/123456
canal-adapter
- 可以将数据库变更同步给MQ、ES、DB、LOGGER,需要我们根据实际业务需要去修改配置文件conf/application.yml
- 创建表
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users` (
`id` varchar(64) NOT NULL,
`username` varchar(20) NOT NULL COMMENT '用户名',
`phone` varchar(64) NOT NULL COMMENT '手机号',
`nickname` varchar(20) NOT NULL COMMENT '昵称',
`last_modified` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `id` (`id`),
UNIQUE KEY `username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
BEGIN;
INSERT INTO `users` VALUES ('1', '郑强', '13181838112', '爸爸', '2022-01-24 17:13:00');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
- 创建索引
PUT /mytest_user/
{
"mappings": {
"_doc":{
"properties": {
"id": {
"type": "keyword"
},
"username": {
"type": "text"
},
"phone": {
"type": "text"
},
"nickname": {
"type": "text"
},
"last_modified":{
"type": "date"
}
}
}
}
}
- 修改canal-adapter配置
server:
port: 8088
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://172.22.77.76:3306/canal_manager?useUnicode=true
username: root
password: rc2021!
canalAdapters:
- instance: testusers # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
properties:
mode: transport # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: es-01
- 修改conf/es7/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: testusers # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user # 索引名称
_type: _doc
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配, **_id要与下面sql语句字段 as _id一样**
upsert: true
#pk: id
sql: "SELECT id as _id,username as username,phone as phone,nickname as nickname,last_modified as last_modified FROM users"
# objFields:
# # _labels: array:;
# # etlCondition: "where a.c_time>={}"
# commitBatch: 3000 # 提交批大小
- 启动
bin/startup.sh
- 全量同步
adapter根目录下创建目录 mkdir sh
vim users_all.sh curl http://localhost:8088/etl/es7/mytest_user.yml -X POST
chmod vim users_all.sh 777
./vim users_all.sh
{"succeeded":true,"resultMessage":"导入ES 数据:4 条"}
增量同步
启动服务之后就已经增量同步,打开日志即可看到tail -f logs/adapter/adapter.log
解决问题
(1) druid类型转换错误
\
1> 下载源码包:[https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz](https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz)
2> 定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>
</dependency>
3> mvn clean package
4> 到canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的
5> 重启adapter
版权声明:本文为CMLXZL原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。