一、canal介绍
binlog是mysql的二进制日志,对于操作数据库的语句,都以此形式保存。Canal是阿里MySQL数据库Binlog的增量订阅&消费组件 。基于数据库Binlog可以监控数据库数据的变化进而用于数据同步等业务。
二、服务端部署
服务端链接: https://github.com/alibaba/canal/releases
解压zip,目录如下:
conf -> example -> instance.properties
日志文件名称和记录位置,如下图所示,修改数据库连接地址、日志文件、连接的用户名和密码
show master status(数据库查询日志文件命令)
(详细配置可以从官网查看,仅记录使用步骤)
三、客户端使用
1、POM文件
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
2、连接配置
canal-monitor-mysql:
hostname: localhost
port: 11111
tableName: fas4.0
具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。
需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.haiot.service.CorpsUploadService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.protocol.Message;
/**
* @program: fas-haiot-interface
* @description: 市级平台数据上传接口相关实现
* @author: liuAnmin
* @create: 2021-03-22 15:52
**/
@Component
@Slf4j
public class CanalUtil {
@Resource
CorpsUploadService corpsUploadService;
@Value("${canal-monitor-mysql.hostname}")
String canalMonitorHost;
@Value("${canal-monitor-mysql.port}")
Integer canalMonitorPort;
@Value("${canal-monitor-mysql.tableName}")
String canalMonitorTableName;
private final static int BATCH_SIZE = 10000;
/**
* 启动服务
*/
@Async("TaskPool")
public void startMonitorSQL() {
while (true) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");
try {
//打开连接
connector.connect();
log.info("数据库检测连接成功!" + canalMonitorTableName);
//订阅数据库表,全部表q
connector.subscribe(canalMonitorTableName + "\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
} else {
handleDATAChange(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
log.error("成功断开监测连接!尝试重连");
} finally {
connector.disconnect();
//防止频繁访问数据库链接: 线程睡眠 10秒
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private void handleDATAChange(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName());
switch (eventType) {
/**
* 删除操作
*/
case DELETE:
corpsUploadService.DeleteOperateToCityInterface(rowChage, entry);
break;
/**
* 添加操作
*/
case INSERT:
corpsUploadService.InsertOperateToCityInterface(rowChage, entry);
break;
/**
* 更新操作
*/
case UPDATE:
corpsUploadService.UpdateOperateToCityInterface(rowChage, entry);
break;
default:
break;
}
}
}
}
版权声明:本文为qq_45872465原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。