服务端:
修改example中的配置
启动服务:
bin目录下 ./startup.sh
客户端:
1、加入依赖:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<otter.version>1.0.23</otter.version>
<fastjson.version>1.2.47</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${otter.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
2、写个main函数
public class ReadBinlogTask {
public static Logger log = Logger.getLogger(ReadBinlogTask.class.getName());
public static int BACH_SIZE =1000;
public static void main(String[] args) {
InetSocketAddress socketAddr = new InetSocketAddress("172.18.71.103", 11111);
CanalConnector connector = CanalConnectors.newSingleConnector(socketAddr,
"example1", "", "");
connector.connect();
connector.subscribe();
connector.rollback();
int i = 0;
while(true){
try{
Message message = connector.getWithoutAck(BACH_SIZE);
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if( batchId == -1 || entries.size() == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
if(entries.size()>0){
System.out.println("get data size:"+entries.size());
entries.stream().forEach(entry -> {
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
CanalEntry.Header header = entry.getHeader();
log.info("header.getEventType()" + eventType);
switch (eventType) {
case INSERT:
log.info("INSERT table");
break;
case UPDATE:
log.info("UPDATE table");
break;
case DELETE:
break;
case QUERY:
break;
case ALTER:
break;
case CREATE:
log.info("CREATE table");
break;
case RENAME:
break;
case CINDEX:
break;
case ERASE:
break;
case DINDEX:
break;
case TRUNCATE:
break;
default:
break;
}
if(!StringUtils.isEmpty(header.getTableName())){
log.info("表名称" + header.getTableName());
}
List<CanalEntry.RowData> list = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData:list){
System.out.println("rowData is :"+rowData);
}
System.out.println("===========================");
}
);
}
}
connector.ack(batchId);
}catch (Exception e){
if (connector != null) {
connector.disconnect();
}
connector.connect();
connector.subscribe();
connector.rollback();
}
}
}
}
3、运行测试:
表结构:
CREATE TABLE `lyl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`addr` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=77 DEFAULT CHARSET=utf8;
插入数据:
insert INTO lyl(name) VALUES('zhangsan');
读取数据结果:
信息: INSERT table
八月 10, 2020 4:14:19 下午 org.example.ReadBinlogTask lambda$main$0
信息: 表名称lyl
rowData is :afterColumns {
index: 0
sqlType: 4
name: "id"
isKey: true
updated: true
isNull: false
value: "75"
mysqlType: "int(11)"
}
afterColumns {
index: 1
sqlType: 12
name: "name"
isKey: false
updated: true
isNull: false
value: "zhangsan"
mysqlType: "varchar(255)"
}
afterColumns {
index: 2
sqlType: 4
name: "age"
isKey: false
updated: true
isNull: true
mysqlType: "int(11)"
}
afterColumns {
index: 3
sqlType: 93
name: "create_time"
isKey: false
updated: true
isNull: false
value: "2020-08-10 16:12:56"
mysqlType: "timestamp"
}
afterColumns {
index: 4
sqlType: 12
name: "addr"
isKey: false
updated: true
isNull: true
mysqlType: "varchar(100)"
}
修改表结构:
alter TABLE lyl
ADD coutry varchar(100); `comment` '地址';
读取到log:
信息: header.getEventType()ALTER
八月 10, 2020 4:14:47 下午 org.example.ReadBinlogTask lambda$main$0
信息: 表名称lyl
再次插入数据:insert INTO lyl(name) VALUES('zhangsan');
读取到数据:
rowData is :afterColumns {
index: 0
sqlType: 4
name: "id"
isKey: true
updated: true
isNull: false
value: "76"
mysqlType: "int(11)"
}
afterColumns {
index: 1
sqlType: 12
name: "name"
isKey: false
updated: true
isNull: false
value: "zhangsan"
mysqlType: "varchar(255)"
}
afterColumns {
index: 2
sqlType: 4
name: "age"
isKey: false
updated: true
isNull: true
mysqlType: "int(11)"
}
afterColumns {
index: 3
sqlType: 93
name: "create_time"
isKey: false
updated: true
isNull: false
value: "2020-08-10 16:13:50"
mysqlType: "timestamp"
}
afterColumns {
index: 4
sqlType: 12
name: "addr"
isKey: false
updated: true
isNull: true
mysqlType: "varchar(100)"
}
afterColumns {
index: 5
sqlType: 12
name: "coutry"
isKey: false
updated: true
isNull: true
mysqlType: "varchar(100)"
}
版权声明:本文为qingwufeiyang_530原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。