简介
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
从0开始配置Canal
1.首先查看mysql是否开启了bin_log日志:
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format%';
如果Value显示为OFF,则修改mysql的配置文件:my.ini/my.cnf
在末未追加:
server_id=1 #mysql实例id,不能和canal的salveId重复
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
之后充气mysql,再查看:SHOW VARIABLES LIKE 'log_bin'; Value为ON的时候,表示bin_log日志开启了
2.在mysql里面添加以下的相关用户和权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3.下载安装Canal服务
https://github.com/alibaba/canal/releases 我这边用的是:canal.deployer-1.1.4
解压文件,修改conf/example下的instance.properties配置文件
# 改成自己需要关联数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 改成自己刚刚创建数据的canal的用户名和密码
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配的正则,关联所有,监听某个库的某个表,用逗号分隔可监听多个:database1.tableName1,database2.tableName2
canal.instance.filter.regex=.*\\..*
进入bin目录下启动
windows点击:startup.bat
linux和mac:startup.sh
查看日志:logs/canal/canal.log最后显示
logs/example/example.log
不报错表示启动成功
4.新建IDEA项目
pom
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
新建CanalClient
@Component
public class CanalClient implements DisposableBean {
private CanalConnector canalConnector;
@Bean
public CanalConnector getCanalConnector() {
canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "canal", "canal");
canalConnector.connect();
// 指定filter,指定{database}.{table},canalConnector.subscribe("database.table,database2.table2");
canalConnector.subscribe(".*\\..*");
// 回滚寻找上次中断的位置
canalConnector.rollback();
return canalConnector;
}
@Override
public void destroy() throws Exception {
if (canalConnector != null) {
canalConnector.disconnect();
}
}
}
新建业务逻辑处理类:CanalScheduling
@Component
public class CanalScheduling implements Runnable {
@Resource
private CanalConnector canalConnector;
@Autowired
private VideoDao videoDao;
@Override
@Scheduled(fixedDelay = 100)
public void run() {
long batchId = -1;
try {
int batchSize = 1000;
Message message = canalConnector.getWithoutAck(batchSize);
batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
entries.forEach(entry -> {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 解析处理
publishCanalEvent(entry);
}
});
}
canalConnector.ack(batchId);
} catch (Exception e) {
e.printStackTrace();
canalConnector.rollback(batchId);
}
}
/**
* 处理解析
*
* @param entry
*/
private void publishCanalEvent(CanalEntry.Entry entry) {
CanalEntry.EventType entryType = entry.getHeader().getEventType();
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
CanalEntry.RowChange change = null;
try {
change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return;
}
change.getRowDatasList().forEach(rowData -> {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
String primaryKey = "id";
CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
&& primaryKey.equals(column.getName())).findFirst().orElse(null);
Map<String, Object> dataMap = parseColumnsToMap(columns);
importToVod(dataMap, database, table);
});
}
/**
* 导入更新数据的逻辑,可以导入oracle,elasticsearch,redis,MongoDB等
* 这边我同步到另一个mysql的数据库
*
* @param map
* @param database
* @param table
*/
private void importToVod(Map<String, Object> map, String database, String table) {
System.out.println(map + database + table);
if (StringUtils.equals("test", database)) {
if (StringUtils.equals("vod_videos", table)) {
// 更新的是vod_videos
String videoId = map.get("video_id").toString();
String title = map.get("title").toString();
videoDao.updateVideo(videoId, title);
}
}
}
/**
* 将columns转map
*
* @param columns
* @return
*/
Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
Map<String, Object> jsonMap = Maps.newHashMap();
columns.forEach(column -> {
if (column == null) {
return;
}
jsonMap.put(column.getName(), column.getValue());
});
return jsonMap;
}
}
在启动类加上开启定时任务:@EnableScheduling
Dao类:VideoDao
@Mapper
public interface VideoDao {
/**
* 更新视频标题
*
* @param videoId
* @param title
*/
@Update("UPDATE vod_videos SET title = #{title} WHERE video_id = #{videoId}")
void updateVideo(@Param("videoId") String videoId, @Param("title") String title);
}
启动程序,修改数据库title字段,加上66666:
会发现收到修改信息,控制台打印日志:
再看目标库,数据也对应的修改了: