一、简介
canal主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
可以理解为一个增量数据同步工具
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务cache刷新
- 带业务逻辑的增量数据处理
二、工作原理
mysql主从复制
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) 。
- MySQL slave 重放 relay log中事件,将数据变更反映它自己的数据。
canal工作原理:
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump协议。
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) 。
- canal 解析 binary log 对象(原始为 byte 流)。
三、开启mysqlBinlog日志(必要条件)
什么是Binlog日志
Binlog日志记录了所有DDL和DML语句,以事件的形式记录,同时还会记录所花费的时间,是mysql日志中最重要的日志。开启Binlog日志后对性能的损耗可以忽略不计,而且Binlog日志是事务安全型。
Binlog日志有两个重要的使用场景:
数据同步以及数据恢复
- 数据同步:在Master节点上开启mysqlBinlog日志,对DDL和DML语句进行记录,将Binlog二进制文件发送给Slaves节点,进行数据同步。
- 数据恢复:使用MysqlBinlog工具恢复Mysql数据。
binlog的分类设置:
MySQL的binlog的格式有三种,分别是STATEMENT、MIXED、ROW。
statement [ 语句级 ] | row [ 行级 ] | mixed [ 综合语句级和行级 ] |
---|---|---|
相对row模式节省空间,但是可能产生不一致性 | binlog会记录每次操作后每行记录的变化。 | statement的升级版,一定程度上解决了因一些情况而造成的statement模式不一致问题 |
节省空间 | 保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。 | 节省空间,同时兼顾了一定的一致性。 |
有可能造成数据不一致(语句执行时间为异步,如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同) | 占用较大空间。 | 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。 |
开启binlog日志需要修改my.cnf文件。
需配置以下内容:
[mysqld]
server-id=1 #id(只要不和下文中Canal的id相同即可)
log-bin=mysql-bin
binlog_format=row #以行级开启日志
binlog-do-db=******** #需要监听的数据库
四、安装canal
docker pull canal/canal-server
#以11111端口启动(如果是阿里云ECS服务器,记得在安全组对11111端口进行放行。)
docker run -d -p 11111:11111 --name canal canal/canal-server
注意canal.properties文件中的端口号(文件在docker容器的/home/admin/canal-server/conf目录下)。
- 注意检查第一个红框部门的端口号是否正确。
- 可以在第二个红框位置对同步策略进行更换,可以选择tcp,kafka,RocketMq,这里使用的是tcp。
接下来修改instance.properties文件(文件在docker容器中/home/admin/canal-server/conf/example目录下)。
- 打码部分更改位自己的数据库地址以及端口号。
- 红框部分为数据库账号密码。
五、在代码中整合canal(SpringBoot+canal)
1、pom文件引入
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<!--与自己的canal版本一致-->
<version>1.1.3</version>
</dependency>
2、项目配置canal
canal-monitor-mysql:
hostname: 部署canal的服务器地址
port: 11111
3、代码集成
@Slf4j
@Component
public class CanalCommandLineRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
//在canal部署的conf/canal.properties ip和端口信息
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("你的canal地址", 你的canal端口号),
"example",
"",
"");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表q
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId > 0 && size != 0) {
handleDataChange(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
//防止频繁访问数据库链接: 线程睡眠 10秒
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void handleDataChange(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
// 只解析mysql事务的操作,其他的不解析
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
// 获取当前操作所属的数据库
String dbName = entry.getHeader().getSchemaName();
// 获取当前操作所属的表
String tableName = entry.getHeader().getTableName();
// 事务提交时间
long timestamp = entry.getHeader().getExecuteTime();
log.info("Canal监测到更新:【{}】库的【{}】表", dbName, tableName);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, eventType);
log.info("-------------------------------------------------------------");
}
}
}
/**
* 解析具体一条Binlog消息的数据
* @param tableName 当前操作所属表名称
* @param eventType 当前操作类型(新增、修改、删除)
*/
private static void dataDetails(List<CanalEntry.Column> beforeColumns,
List<CanalEntry.Column> afterColumns,
String tableName,
CanalEntry.EventType eventType) {
JSONObject beforeData = new JSONObject();
for (CanalEntry.Column column : beforeColumns) {
beforeData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
for (CanalEntry.Column column : afterColumns) {
afterData.put(column.getName(), column.getValue());
}
System.out.println("tableName = " + tableName +
",eventType = " + eventType +
",beforeData = " + beforeData +
",afterData = " + afterData);
SecurityUser securityUser = JSONObject.toJavaObject(afterData, SecurityUser.class);
System.out.println("securityUser = " + securityUser);
}
}
版权声明:本文为qq_47396161原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。