canal读取binlog

服务端:

修改example中的配置

启动服务:

bin目录下 ./startup.sh

 

客户端:

1、加入依赖:

 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <otter.version>1.0.23</otter.version>
    <fastjson.version>1.2.47</fastjson.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>${otter.version}</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

2、写个main函数

public class ReadBinlogTask {

    public static Logger log = Logger.getLogger(ReadBinlogTask.class.getName());


    public static int BACH_SIZE =1000;


    public static void main(String[] args) {
        InetSocketAddress socketAddr = new InetSocketAddress("172.18.71.103", 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(socketAddr,
                "example1", "", "");
        connector.connect();
        connector.subscribe();
        connector.rollback();

        int i = 0;
        while(true){
            try{
                Message message = connector.getWithoutAck(BACH_SIZE);
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();
                if(	batchId == -1 || entries.size() == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    if(entries.size()>0){
                        System.out.println("get data size:"+entries.size());
                        entries.stream().forEach(entry -> {
                                    CanalEntry.RowChange rowChange = null;
                                    try {
                                        rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                                    } catch (InvalidProtocolBufferException e) {
                                        e.printStackTrace();
                                    }
                                    CanalEntry.EventType eventType = rowChange.getEventType();
                                    CanalEntry.Header header = entry.getHeader();
                                    log.info("header.getEventType()" + eventType);
                                    switch (eventType) {
                                        case INSERT:
                                            log.info("INSERT table");
                                            break;
                                        case UPDATE:
                                            log.info("UPDATE table");
                                            break;
                                        case DELETE:
                                            break;
                                        case QUERY:
                                            break;
                                        case ALTER:
                                            break;
                                        case CREATE:
                                            log.info("CREATE table");
                                            break;
                                        case RENAME:
                                            break;
                                        case CINDEX:
                                            break;
                                        case ERASE:
                                            break;
                                        case DINDEX:
                                            break;
                                        case TRUNCATE:
                                            break;
                                        default:
                                            break;
                                    }
                                    if(!StringUtils.isEmpty(header.getTableName())){
                                        log.info("表名称" + header.getTableName());
                                    }

                                    List<CanalEntry.RowData> list = rowChange.getRowDatasList();
                                    for(CanalEntry.RowData rowData:list){
                                        System.out.println("rowData is :"+rowData);

                                    }
                                    System.out.println("===========================");
                                }
                        );
                    }
                }
                connector.ack(batchId);
            }catch (Exception e){
                if (connector != null) {
                    connector.disconnect();
                }
                connector.connect();
                connector.subscribe();
                connector.rollback();
            }
        }
    }
}

3、运行测试:

表结构:

CREATE TABLE `lyl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `addr` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=77 DEFAULT CHARSET=utf8;

插入数据:

insert INTO lyl(name) VALUES('zhangsan');

读取数据结果:

信息: INSERT table
八月 10, 2020 4:14:19 下午 org.example.ReadBinlogTask lambda$main$0
信息: 表名称lyl
rowData is :afterColumns {
  index: 0
  sqlType: 4
  name: "id"
  isKey: true
  updated: true
  isNull: false
  value: "75"
  mysqlType: "int(11)"
}
afterColumns {
  index: 1
  sqlType: 12
  name: "name"
  isKey: false
  updated: true
  isNull: false
  value: "zhangsan"
  mysqlType: "varchar(255)"
}
afterColumns {
  index: 2
  sqlType: 4
  name: "age"
  isKey: false
  updated: true
  isNull: true
  mysqlType: "int(11)"
}
afterColumns {
  index: 3
  sqlType: 93
  name: "create_time"
  isKey: false
  updated: true
  isNull: false
  value: "2020-08-10 16:12:56"
  mysqlType: "timestamp"
}
afterColumns {
  index: 4
  sqlType: 12
  name: "addr"
  isKey: false
  updated: true
  isNull: true
  mysqlType: "varchar(100)"
}

修改表结构:

alter TABLE lyl 
ADD coutry varchar(100); `comment` '地址';

读取到log:

信息: header.getEventType()ALTER
八月 10, 2020 4:14:47 下午 org.example.ReadBinlogTask lambda$main$0
信息: 表名称lyl

再次插入数据:insert INTO lyl(name) VALUES('zhangsan');

读取到数据:

rowData is :afterColumns {
  index: 0
  sqlType: 4
  name: "id"
  isKey: true
  updated: true
  isNull: false
  value: "76"
  mysqlType: "int(11)"
}
afterColumns {
  index: 1
  sqlType: 12
  name: "name"
  isKey: false
  updated: true
  isNull: false
  value: "zhangsan"
  mysqlType: "varchar(255)"
}
afterColumns {
  index: 2
  sqlType: 4
  name: "age"
  isKey: false
  updated: true
  isNull: true
  mysqlType: "int(11)"
}
afterColumns {
  index: 3
  sqlType: 93
  name: "create_time"
  isKey: false
  updated: true
  isNull: false
  value: "2020-08-10 16:13:50"
  mysqlType: "timestamp"
}
afterColumns {
  index: 4
  sqlType: 12
  name: "addr"
  isKey: false
  updated: true
  isNull: true
  mysqlType: "varchar(100)"
}
afterColumns {
  index: 5
  sqlType: 12
  name: "coutry"
  isKey: false
  updated: true
  isNull: true
  mysqlType: "varchar(100)"
}

 


版权声明:本文为qingwufeiyang_530原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。