canal mysql 同步_canal mysql 数据同步

首先canal是什么呢?

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL;简单来说,canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析后供消费端使用。canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,然后根据这些事件的变更类型(更新、删除等类型)进行相管处理,比如将变更的数据更新到redis或者发送变更通知到第三方等。

接下来我们来简单构建一个单节点canal实例,并通过客户订阅获取变更事件进行相关业务操作。

1.从github上下载最新稳定版本canal,我下载的是canal.deployer-1.0.25,解压缩

2.创建mysql用户并授权(canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限)

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

3.修改mysql的配置文件

canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.(ps. 目前canal已经支持mixed/statement/row模式)

[mysqld]

log-bin=mysql-bin #添加这一行就ok

binlog-format=ROW #选择row模式

server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

更改配置(conf/example/instance.properties):

#################################################

## mysql serverId

canal.instance.mysql.slaveId=1234

# position info

canal.instance.master.address=10.0.2.30:3306

canal.instance.master.journal.name=

canal.instance.master.position=

canal.instance.master.timestamp=

# table meta tsdb info (暂时不了解,先关闭)

canal.instance.tsdb.enable=false

canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}

canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

canal.instance.tsdb.dbUsername=canal

canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

# username/password

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

canal.instance.defaultDatabaseName=xxx_test

canal.instance.connectionCharset=UTF-8

# table regex

canal.instance.filter.regex=xxx_test.tb_score_info,xxx_test.tb_company_score_info

# table black regex

canal.instance.filter.black.regex=

#################################################

以上红色标注信息需要重点关注,

canal.instance.master.address为数据库ip:port

canal.instance.dbUsername 数据库用户名canal.instance.dbPassword 数据库密码canal.instance.defaultDatabaseName 数据库名称

canal.instance.filter.regex 表示需要订阅发生变更的表名称,可以添加多个逗号隔开即可

4.启动canal服务(/bin/startup.sh),jps查看CanalLauncher进程是否存在

5.编写客户端代码

主入口代码:

/***@authorxiaofeng*@versionV1.0*@title:DataSyncHandle.java*@package:com.xx.xx.app*@description:数据同步处理*@date2018/3/5 0005下午7:47*/public classDataSyncHandle extendsBaseSpringApp {

protected static finalLogger logger= LoggerFactory.getLogger(DataSyncHandle.class);CanalClient canalClient;publicDataSyncHandle(String[] args) {

super(args,"classpath:dataSync-app-handle.xml");this.canalClient= SpringContextManager.getBean(CanalClient.class);}

public voidrun() {

canalClient.connect();}

public static voidmain(String[] args) {

intstatus = 1;try{

start(args);status = 0;} catch(Throwable e) {

logger.error("data sync exception.",e);logger.info("program exception exit! restarting......");start(args);} finally{

System.exit(status);logger.info("data sync handle exit......");}

}

private static voidstart(String[] args) {

DataSyncHandle app = newDataSyncHandle(args);app.run();}

服务连接订阅:

/***@authorxiaofeng*@versionV1.0*@title:CanalClient.java*@package:com.xx.xx.app.canal*@description:TODO*@date2018/2/11 16:13*/public classCanalClient {

staticLogger logger= LoggerFactory.getLogger(CanalClient.class);@AutowiredScoreInfoService scoreInfoService;@Value("${canal.server.host}")

String hostName= "10.0.2.23";@Value("${canal.server.port}")

intport= 11111;@Value("${canal.server.destination}")

String destination= "example";@Value("${canal.server.username}")

String username;@Value("${canal.server.password}")

String password;public voidconnect() {

CanalConnector connector = CanalConnectors.newSingleConnector(

newInetSocketAddress(hostName,port),destination,username,password);connector.connect();connector.subscribe(Constant.TABLE);while(true) {

Message message = null;try{

message = connector.getWithoutAck(100);} catch(Exception e) {

logger.error(e.getMessage(),e);}

longbatchId = message.getId();if(batchId == -1|| message.getEntries().isEmpty()) {

try{

Thread.sleep(3000);} catch(InterruptedException e) {

e.printStackTrace();}

} else{

handle(message.getEntries());connector.ack(batchId);}

}

}

private voidhandle(List entries) {

entries.stream().forEach(entry -> {

String tableName = entry.getHeader().getTableName();if(StringUtils.isNoneBlank(tableName)) {

logger.info("table name:"+ tableName);try{

switch(TableEnum.getTableName(tableName)) {

caseTB_SCORE_INFO:

scoreInfoService.userScoreReloadToCache(entry);break;default:

break;}

} catch(Exception e) {

e.printStackTrace();}

}

});}

}

注意点:connector.subscribe(Constant.TABLE);表示需要订阅发生变更的表名称,可以添加多个逗号隔开即可,需要与canal.instance.filter.regex配置中的一致,否则会被覆盖

获取变更事件具体业务处理:

/***@authorxiaofeng*@versionV1.0*@title:ScoreInfoServiceImpl.java*@package:com.xx.xx.app.service.impl*@description:积分服务*@date2018/3/6 0006下午1:36*/public classScoreInfoServiceImpl extendsBaseService implementsScoreInfoService {

Logger logger= LoggerFactory.getLogger(getClass());@AutowiredScoreInfoMapper scoreInfoMapper;@AutowiredRedisExtendClient redisExtendClient;/***用户积分重载**@paramentry*/@Overridepublic voiduserScoreReloadToCache(CanalEntry.Entry entry) {

CanalEntry.RowChange rowChange = null;try{

rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch(InvalidProtocolBufferException e) {

e.printStackTrace();}

for(CanalEntry.RowData rowData : rowChange.getRowDatasList()) {

if(rowChange.getEventType() == CanalEntry.EventType.INSERT|| rowChange.getEventType() == CanalEntry.EventType.UPDATE) {

logger.info("[operation = insert or update] ");Map values = convertToBeanMap(rowData.getAfterColumnsList());ScoreInfo scoreInfo = JsonUtils.json2Obj(JsonUtils.obj2Json(values), newTypeReference() {

});logger.info("scoreInfo: "+ JsonUtils.obj2Json(scoreInfo));BigDecimal currentDeposit = scoreInfo.getRechargeDeposit().add(scoreInfo.getPresentDeposit()).add(scoreInfo.getProfitDeposit()).add(scoreInfo.getInviteDeposit()).subtract(scoreInfo.getBlockingPoint());BigDecimal formatValue = DataFormatUtil.defaultDecimalFormat(currentDeposit);redisExtendClient.hset(Constant.SCORE,Constant.USER_SCORE+ scoreInfo.getId(),String.valueOf(formatValue),Constant.SCORE_DBINDEX);} else if(rowChange.getEventType() == CanalEntry.EventType.DELETE) {

String columnId = getColumnId(rowData,"user_id");if(StringUtils.isBlank(columnId)) {

return;}

logger.info("[operation = delete]");logger.info("id=["+ columnId + "]");redisExtendClient.hdel(Constant.SCORE,Constant.USER_SCORE+ columnId,Constant.SCORE_DBINDEX);}

}

}

}

启动客户端,手动去数据库对订阅的表变更一个字段即可,然后查看客户端控制台发现已经获取到了变更事件信息,我的业务代码处理逻辑会把变更后的数据更新到redis中,当然你也可以有其它操作哦:

ce36e8adab366bc5c5b8623799c6397b.png

好啦,至此我们的简单版canal数据同步已经搭建成功了哦,可以开始你的数据同步之旅了哦!


版权声明:本文为weixin_36353849原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。