canal能监控多个mysql_Canal之配置,多库监听

配置说明

介绍配置之前,先了解下canal的配置加载方式:

f2ee5bbe7136b22fe61e2715652d4e2d.png

Spring配置

spring配置的原理是将整个配置抽象为两部分:

xxxx-instance.xml (canal组件的配置定义,可以在多个instance配置中共享) xxxx.properties (每个instance通道都有各自一份定义,因为每个mysql的ip,帐号,密码等信息不会相同)

通过spring的PropertyPlaceholderConfigurer通过机制将其融合,生成一份instance实例对象,每个instance对应的组件都是相互独立的,互不影响

properties配置文件

properties配置分为两部分:

canal.properties (系统根配置文件) instance.properties (instance级别的配置文件,每个instance一份)

canal.properties介绍:

canal配置主要分为两部分定义:

1. instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)

参数名字参数说明默认值

canal.destinations

当前server上部署的instance列表

canal.conf.dir

conf/目录所在的路径

../conf

canal.auto.scan

开启instance自动扫描

如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:

a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动

b. instance目录删除:卸载对应instance配置,如已启动则进行关闭

c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作

true

canal.auto.scan.interval

instance自动扫描的间隔时间,单位秒

5

canal.instance.global.mode

全局配置加载方式

spring

canal.instance.global.lazy

全局lazy模式

false

canal.instance.global.manager.address

全局的manager配置方式的链接信息

canal.instance.global.spring.xml

全局的spring配置方式的组件文件

classpath:spring/file-instance.xml (spring目录相对于canal.conf.dir)

canal.instance.example.mode

canal.instance.example.lazy

canal.instance.example.spring.xml

…..

instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式

命名规则:canal.instance.{name}.xxx

2. common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】

参数名字参数说明默认值

canal.id

每个canal server实例的唯一标识,暂无实际意义

1

canal.ip

canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务

canal.port

canal server提供socket服务的端口

11111

canal.zkServers

canal server链接zookeeper集群的链接信息

例子:127.0.0.1:2181,127.0.0.1:2182

canal.zookeeper.flush.period

canal持久化数据到zookeeper上的更新频率,单位毫秒

1000

canal.file.data.dir

canal持久化数据到file上的目录

../conf (默认和instance.properties为同一目录,方便运维和备份)

canal.file.flush.period

canal持久化数据到file上的更新频率,单位毫秒

1000

canal.instance.memory.batch.mode

canal内存store中数据缓存模式

1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量

2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小

MEMSIZE

canal.instance.memory.buffer.size

canal内存store中可缓存buffer记录数,需要为2的指数

16384

canal.instance.memory.buffer.memunit

内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小

1024

canal.instance.transactionn.size

最大事务完整解析的长度支持超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性

1024

canal.instance.fallbackIntervalInSeconds

canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒

说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢

60

canal.instance.detecting.enable

是否开启心跳检查

false

canal.instance.detecting.sql

心跳检查sql

insert into retl.xdual values(1,now()) on duplicate key update x=now()

canal.instance.detecting.interval.time

心跳检查频率,单位秒

3

canal.instance.detecting.retry.threshold

心跳检查失败重试次数

3

canal.instance.detecting.heartbeatHaEnable

心跳检查失败后,是否开启自动mysql自动切换

说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据

false

canal.instance.network.receiveBufferSize

网络链接参数,SocketOptions.SO_RCVBUF

16384

canal.instance.network.sendBufferSize

网络链接参数,SocketOptions.SO_SNDBUF

16384

canal.instance.network.soTimeout

网络链接参数,SocketOptions.SO_TIMEOUT

30

canal.instance.filter.query.dcl

是否忽略DCL的query语句,比如grant/create user等

false

canal.instance.filter.query.dml

是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query记录)

false

canal.instance.filter.query.ddl

是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型)

false

canal.instance.get.ddl.isolation

ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回.(otter ddl同步使用)

false

instance.properties介绍:

a. 在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件

比如:

canal.destinations = example1,example2

这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.

ps. canal自带了一份instance.properties demo,可直接复制conf/example目录进行配置修改

b. 如果canal.properties未定义instance列表,但开启了canal.auto.scan时

server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance server运行过程中,会根据canal.auto.scan.interval定义的频率,进行扫描

发现目录有新增,启动新的instance 发现目录有删除,关闭老的instance 发现对应目录的instance.properties有变化,重启instance

一个标准的conf目录结果:

数名字参数说明默认值

canal.instance.mysql.slaveId

mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一

1234

canal.instance.master.address

mysql主库链接地址

127.0.0.1:3306

canal.instance.master.journal.name

mysql主库链接时起始的binlog文件

canal.instance.master.position

mysql主库链接时起始的binlog偏移量

canal.instance.master.timestamp

mysql主库链接时起始的binlog的时间戳

canal.instance.dbUsername

mysql数据库帐号

canal

canal.instance.dbPassword

canal

canal.instance.defaultDatabaseName

mysql链接时默认schema

canal.instance.connectionCharset mysql 数据解析编码

UTF-8

canal.instance.filter.regex

mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)

常见例子:

1. 所有表:.* or .*\\..*

2. canal schema下所有表: canal\\..*

3. canal下的以canal打头的表:canal\\.canal.*

4. canal schema下的一张表:canal.test1

5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

.*\\..*

几点说明:

mysql链接时的起始位置

canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动 canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

mysql解析关注表定义

标准的Perl正则,注意转义时需要双斜杠:\\

mysql链接的编码

目前canal版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过filter.regex配置,将其拆分为多个canal instance,为每个instance指定不同的编码

instance.xml配置文件

目前默认支持的instance.xml有以下几种:

spring/memory-instance.xml spring/file-instance.xml spring/default-instance.xml spring/group-instance.xml

在介绍instance配置之前,先了解一下canal如何维护一份增量订阅&消费的关系信息:

解析位点 (parse模块会记录,上一次解析binlog到了什么位置,对应组件为:CanalLogPositionManager) 消费位点 (canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,对应的组件为:CanalMetaManager)

对应的两个位点组件,目前都有几种实现:

memory (memory-instance.xml中使用) zookeeper mixed file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上) period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)

memory-instance.xml介绍:

file-instance.xml介绍:

default-instance.xml介绍:

group-instance.xml介绍:

instance.xml设计初衷:

允许进行自定义扩展,比如实现了基于数据库的位点管理后,可以自定义一份自己的instance.xml,整个canal设计中最大的灵活性在于此

HA模式配置

机器准备

运行canal的机器: 10.20.144.22 , 10.20.144.51. zookeeper地址为10.20.144.51:2181 mysql地址:10.20.144.15:3306

按照部署和配置,在单台机器上各自完成配置,演示时instance name为example

修改canal.properties,加上zookeeper配置

创建example目录,并修改instance.properties

启动两台机器的canal

启动后,你可以查看logs/example/example.log,只会看到一台机器上出现了启动成功的日志。

比如我这里启动成功的是10.20.144.51

2013-03-19 18:18:20.590 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]

2013-03-19 18:18:20.596 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]

2013-03-19 18:18:20.831 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example

2013-03-19 18:18:20.845 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful...

查看一下zookeeper中的节点信息,也可以知道当前工作的节点为10.20.144.51:11111

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running

{"active":true,"address":"10.20.144.51:11111","cid":1}

客户端链接, 消费数据

a. 可以直接指定zookeeper地址和instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接:

CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", "example", "", "");

b. 链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等 (聪明的你,应该也可以发现,canal client也可以支持HA功能)

[zk: localhost:2181(CONNECTED) 17] get /otter/canal/destinations/example/1001/running

{"active":true,"address":"10.12.48.171:50544","clientId":1001}

c. 数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. (下次你重启client时,会从这最后一个位点继续进行消费)

[zk: localhost:2181(CONNECTED) 16] get /otter/canal/destinations/example/1001/cursor

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"10.20.144.15","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.002253","position":2574756,"timestamp":1363688722000}}

重启一下canal server

停止正在工作的10.20.144.51的canal server

ssh 10.20.144.51

sh bin/stop.sh

这时10.20.144.22会立马启动example instance,提供新的数据服务

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running

{"active":true,"address":"10.20.144.22:11111","cid":1}

与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成

触发HA自动切换场景 (server/client HA模式都有效)

正常场景

正常关闭canal server(会释放instance的所有资源,包括删除running节点) 平滑切换(gracefully)

操作:更新对应instance的running节点内容,将”active”设置为false,对应的running节点收到消息后,会主动释放running节点,让出控制权但自己jvm不退出,gracefully.

{"active":false,"address":"10.20.144.22:11111","cid":1}

异常场景

canal server对应的jvm异常crash,running节点的释放会在对应的zookeeper session失效后,释放running节点(EPHEMERAL节点)

ps. session过期时间默认为zookeeper配置文件中定义的tickTime的20倍,如果不改动zookeeper配置,那默认就是40秒

canal server所在的网络出现闪断,导致zookeeper认为session失效,释放了running节点,此时canal server对应的jvm并未退出,(一种假死状态,非常特殊的情况)

ps. 为了保护假死状态的canal server,避免因瞬间runing失效导致instance重新分布,所以做了一个策略:canal server在收到running节点释放后,延迟一段时间抢占running,原本running节点的拥有者可以不需要等待延迟,优先取得running节点,可以保证假死状态下尽可能不无谓的释放资源。 目前延迟时间的默认值为5秒,即running节点针对假死状态的保护期为5秒.

mysql多节点解析配置(parse解析自动切换)

mysql机器准备

准备两台mysql机器,配置为M-M模式,比如ip为:10.20.144.25:3306,10.20.144.29:3306

[mysqld]

xxxxx ##其他正常master/slave配置

log_slave_updates=true ##这个配置一定要打开

canal instance配置

# position info

canal.instance.master.address = 10.20.144.25:3306

canal.instance.master.journal.name =

canal.instance.master.position =

canal.instance.master.timestamp =

canal.instance.standby.address = 10.20.144.29:3306

canal.instance.standby.journal.name =

canal.instance.standby.position =

canal.instance.standby.timestamp =

## detecing config

canal.instance.detecting.enable = true ## 需要开启心跳检查

canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() ##心跳检查sql,也可以选择类似select 1的query语句

canal.instance.detecting.interval.time = 3 ##心跳检查频率

canal.instance.detecting.retry.threshold = 3 ## 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog

canal.instance.detecting.heartbeatHaEnable = true ## 心跳检查超过失败次数阀值后,是否开启master/standby的切换.

注意:

填写master/standby的地址和各自的起始binlog位置,目前配置只支持一个standby配置. 发生master/standby的切换的条件:(heartbeatHaEnable = true) && (失败次数>=retry.threshold). 多引入一个heartbeatHaEnable的考虑:开启心跳sql有时候是为client检测canal server是否正常工作,如果定时收到了心跳语句,那说明整个canal server工作正常

启动 & 测试

比如关闭一台机器的mysql , /etc/init.d/mysql stop 。在经历大概 interval.time * retry.threshold时间后,就会切换到standby机器上


版权声明:本文为weixin_29132813原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。