一、下载及安装
下载:docker pull docker.io/canal/canal-server
安装:docker run -p 11111:11111 --name 容器名(canal)-d docker.io/canal/canal-server
设置定时自启动:docker update --restart=always canal
二、配置
1、执行 docker exec -it canal /bin/bash
命令进入容器中 ,如下图:
2、执行cd canal-server/conf
进入配置文件目录,执行vi canal.properties
命令编辑系统根配置文件:
参数名称 | 参数说明 |
---|---|
canal.id | 每个canal server实例的唯一标识,默认值为1 |
canal.ip | canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 |
canal.port | canal server提供socket服务的端口,默认值11111 |
3、执行vi canal-server/conf/example/instance.properties
命令编辑instance级别的配置文件:
4、此图为配置主数据库的IP及端口,默认IP为127.0.0.1
,使用默认地址时可能会报java.io.IOException: connect /127.0.0.1:3306 failure
,这时将默认IP改为主机真实IP地址就可以解决这个错误。
5、 此图为配置主数据库分配的用户名、密码及编码
6、此图为配置匹配表的正则表达式:
canal.instance.filter.regex=.*\\..*
,其中.*
表示配置连接中所有的数据库,\\..*
表示数据库中所有的表,.*\\..*
表示所有数据库中的所有的表的变化都被监听。
canal.instance.filter.regex=test.*
表示只对test数据库中所有的表进行监听。
7、此图表示自定义微服务监听需要指定example的名称
三、启动
1、执行sh canal-server/bin/startup.sh
命令启动canal2、 执行
vi canal-server/log/canal/canal.log
查看canal启动日志信息3、执行
sh canal-server/bin/stop.sh
命令关闭canal
四、Demo
1、引入服务所依赖jar包
<description>Canal微服务,实现对数据库数据进行监控</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
2、application.yml配置文件中进行相关配置如下:
canal:
client:
instances:
example: #此处为上图2.7配置的example名称
host: 192.168.160.129 #服务IP地址
port: 11111 #服务端口
3、创建启动类并添加相关注解,并排除数据库自动加载
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableDiscoveryClient
@EnableCanalClient
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
4、创建一个类实现对mysql的增删改进行监听
package cn.changgou.listen;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.DeleteListenPoint;
import com.xpand.starter.canal.annotation.InsertListenPoint;
import com.xpand.starter.canal.annotation.UpdateListenPoint;
/**
* 实现mysql数据监听
*/
@CanalEventListener
public class CanalDataEventListen {
/**
* SQL增加监听
* @param data 发生变更的一行数据
* @param type 当前操作的类型,增加数据
*/
@InsertListenPoint
public void onEventInsert(CanalEntry.EventType type,CanalEntry.RowData data){
for (CanalEntry.Column column : data.getAfterColumnsList()) {
System.out.println("列名:" + column.getName() + "--------->数据:" + column.getValue());
}
}
/**
* SQL删除监听
*/
@DeleteListenPoint
public void onEventDel(CanalEntry.EventType type,CanalEntry.RowData data){
for (CanalEntry.Column column : data.getBeforeColumnsList()) {
System.out.println("列名:" + column.getName() + "--------->数据:" + column.getValue());
}
}
/**
* SQL修改监听
*/
@UpdateListenPoint
public void onEventUpdate(CanalEntry.EventType type,CanalEntry.RowData data){
for (CanalEntry.Column column : data.getAfterColumnsList()) {
System.out.println("列名:" + column.getName() + "--------->数据:" + column.getValue());
}
}
}
到此一个canal的配置及其小demo已经结束了,记录一下,方便查阅。