参考地址
https://github.com/alibaba/canal/wiki/AdminGuide
概念
canal-admin:控制台,可配置server和instance。
canal-server:代表一个canal运行实例,对应于一个jvm;内部包含多个instance,每个instance都会伪装成一个mysql实例的slave
instance:对应于一个数据队列
架构

instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
环境准备
数据库环境准备
mysql需要开启binlog,修改my.cnf文件,添加如下配置:
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
ps:canal支持所有模式的增量订阅(但配合同步时,因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为ROW模式)
创建canal账户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1.1.4版本安装
安装canal admin
下载地址
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
解压到指定目录下
tar zxvf canal.admin-$version.tar.gz -C /指定目录
修改配置文件
修改文件:application vi conf/application.yml
内容如下:
server:
port: 8080
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
上述配置中,需要修改的地方有5处:
- server.port:修改为应用端口号
- spring.datasource.address:canal admin的数据库的地址
- spring.datasource.database:数据库名称,默认canal_manager不变
- spring.datasource.username:数据库用户名
- spring.datasource.password:数据库密码
- canal.adminUser:canal admin账号
- canal.adminPasswd:canal admin密码
创建上面数据表及表结构
导入初始化sql,路径在项目canal admin下:conf/canal_manager.sql
启动canal-admin
命令:sh bin/startup.sh
默认登入账号密码是:admin/123456
安装canal-server
下载地址
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解压到指定目录下
tar -zxvf canal.deployer-1.1.1.tar.gz -C /指定目录
修改配置文件
server先使用local启动,先修改local配置文件,别的配置在控制台修改
修改配置文件/conf/canal_local.properties
需要修改的地方:
- canal.admin.manager ,配置canal-admin的地址
- canal.admin.user:和admin中的配置统一,对应canal.adminUser
- canal.admin.passwd:对应admin中的canal.adminPasswd 的密文(ps:可通过sql命令获取密码:select password(123456); 去掉结果最前面的*就是密码)
- canal.admin.register.cluster:集群的名称
启动canal-server
命令:sh bin/startup.sh local
控制台配置
集群管理配置
ps:单机server无需配置集群
新建集群
1.新建集群需要zk地址:

2.修改集群的主配置
选择载入模板,需要修改模板的配置项如下:
canal.admin.manager :canal-admin的地址
canal.zkServers:zk的地址
加入配置项:canal.instance.global.spring.xml= classpath:spring/default-instance.xml,代表binlog的读取位置会记录内存和zk
canal.serverMode = RocketMQ:发送rocketmq消息
canal.mq.servers :rocketmq的地址
canal.admin.user:和admin中的配置统一,对应canal.adminUser
canal.admin.passwd:对应admin中的canal.adminPasswd 的密文
rocketmq集群配置地址时,多个地址之间用;隔开
server管理配置
新建server

- 所属集群可选择集群或者单机
- server ip为启动的的canal server的服务ip
- admin端口就11110
修改配置
当server是集群模式下时,无法修改配置,只能去对应的集群修改;集群下的server的配置相同
如果server是单机,修改配置可参考上诉集群主配置
instance管理配置
新建instance

选择载入模板
修改配置
需要修改配置项如下:
- canal.instance.master.address:数据库地址
- canal.instance.dbUsername:mysql创建的canal账户
- canal.instance.dbPassword:mysql创建的canal账户密码
- canal.instance.defaultDatabaseName:数据库名
- canal.instance.filter.regex:匹配的数据库表,正则表达式
- canal.instance.filter.query.ddl=true:忽略ddl语句(就监控dml语句,insert,update,delete)
- canal.mq.topic:mq的topic
1.1.5版本安装
和1.1.4版本的不同点
server配置或者集群下的主配置
- canal.serverMode=rocketMQ ,不是RocketMQ
- canal.instance.filter.query.ddl 可直接在server统一配置,无需在instance中单独配置
- mq配置项名称修改,比如rocketmq.namesrv.addr
rocketmq集群配置地址时,多个地址之间用;隔开
源码解读
deployer模块
该模块主要用于独立部署canal server。通过启动脚本startup.sh中的配置启动入口类(com.alibaba.otter.canal.deployer.CanalLauncher),并且设置系统变量canal.conf为指定的启动配置文件名。
CanalLauncher
启动类的源码如下:
public static void main(String[] args) {
try {
.....
// 1.获取启动配置,并且加载解析配置项
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
// 2.创建一个server启动类
final CanalStarter canalStater = new CanalStarter(properties);
// 3.如果配置了canal-admin控制台地址,就调控制台服务中获取server的接口
// (/api/{env}/config/server_polling),通过该接口,去admin对应的数据库中查询最新的server的配置
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
// 4.user和passwd作为server和admin通信的请求头数据,类似鉴权的作用
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
registerIp = AddressUtils.getHostIp();
}
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegister,
autoCluster,
name);
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can't not found config for [" + registerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
// 5.schedule线程,在admin控制台可以配置server,所以默认定时5秒去admin拉取最新的server配置
executor.scheduleWithFixedDelay(new Runnable() {
private PlainCanal lastCanalConfig;
public void run() {
try {
if (lastCanalConfig == null) {
lastCanalConfig = configClient.findServer(null);
} else {
PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
if (newCanalConfig != null) {
// 6.远程配置canal.properties修改重新加载整个应用
canalStater.stop();
Properties managerProperties = newCanalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
canalStater.setProperties(managerProperties);
// 7.查看下面CanalServer的源码
canalStater.start();
lastCanalConfig = newCanalConfig;
}
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setProperties(managerProperties);
} else {
// 7.如果没有配置控制台地址,启动参数指定的配置就是server的配置
canalStater.setProperties(properties);
}
// 8.启动server
canalStater.start();
runningLatch.await();
executor.shutdownNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}
CanalStarter
CanalStarter:主要是server的启动和stop的实现
server启动的方法start()源码如下
public synchronized void start() throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
// 1.
if (!"tcp".equalsIgnoreCase(serverMode)) {
// 1.通过spi的机制来获取
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
if (canalMQProducer != null) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
canalMQProducer.init(properties);
Thread.currentThread().setContextClassLoader(cl);
}
}
if (canalMQProducer != null) {
MQProperties mqProperties = canalMQProducer.getMqProperties();
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
if (mqProperties.isFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
}
logger.info("## start the canal server.");
controller = new CanalController(properties);
controller.start();
logger.info("## the canal server is running now ......");
shutdownThread = new Thread(() -> {
try {
logger.info("## stop the canal server");
controller.stop();
CanalLauncher.runningLatch.countDown();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(destinations);
controller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
port,
user,
passwd,
ip);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.parseInt(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;
}