基于canal的实时数据同步

适用场景

使用canal做数据备份而不用mysql自带的主从备份的场景主要为:

  1. 跨数据库的数据备份,例如mysql => oracle
  2. 数据异构,即对同一份数据做不同的分库分表查询。例如卖家和买家各自分库索引

maven

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.2</version>
</dependency>

java

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.lang.StringUtils;

public class SimpleCanalClient {

    public static void main(String[] args) throws Exception {
        String destination = "example";
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111), destination, "", "");

        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();

        int batchSize = 5 * 1024;

        while (true) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                // try {
                // Thread.sleep(1000);
                // } catch (InterruptedException e) {
                // }
            } else {
                synchronizedData(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }
    }

    /**
     * 同步数据
     * @param entries
     * @throws Exception
     */
    private static void synchronizedData(List<Entry> entries) throws Exception {
        for (Entry entry : entries) {
            if (entry.getEntryType() != EntryType.ROWDATA) {
                continue;
            }

            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            String tableName = entry.getHeader().getTableName();
            for (RowData rowData : rowChange.getRowDatasList()) {
                String sql = getSql(rowChange.getEventType(),tableName,rowData);
                System.out.println(sql);
                // TODO 执行sql语句
            }
        }
    }

    /**
     * 获取增删改的sql
     * @param eventType
     * @param tableName
     * @param rowData
     * @return
     */
    private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){
        String sql = null;
        switch (eventType) {
            case INSERT:
                sql = getInsertSql(tableName,rowData.getAfterColumnsList());
                break;
            case UPDATE:
                sql = getUpdateSql(tableName,rowData.getAfterColumnsList());
                break;
            case DELETE:
                sql = getDeleteSql(tableName,rowData.getBeforeColumnsList());
                break;
            default:
                break;
        }
        return sql;
    }

    private static String getInsertSql(String tableName,List<Column> columns){
        if(columns.size() == 0 || StringUtils.isBlank(tableName)){
            return null;
        }
        String keys = "";
        String values = "";
        for(int i=0;i<columns.size();i++){
            if(i != 0) {
                keys += ",";
                values += ",";
            }
            keys += columns.get(i).getName();
            values += getValue(columns.get(i));
        }
        String format = "INSERT INTO %s (%s) VALUES (%s)";
        return String.format(format,tableName,keys,values);
    }

    private static String getUpdateSql(String tableName,List<Column> columns){
        if(columns.size() == 0 || StringUtils.isBlank(tableName)){
            return null;
        }
        String sets = "";
        String where = "";
        for(Column column : columns){
            if(column.getIsKey()){
                where = column.getName() + "=" + getValue(column);
                continue;
            }
            if(!StringUtils.isBlank(sets)) {
                sets += ",";
            }
            sets += column.getName() + "=" + getValue(column);
        }
        String format = "UPDATE %s SET %s WHERE %s";
        return String.format(format,tableName,sets,where);
    }

    private static String getDeleteSql(String tableName,List<Column> columns){
        if(columns.size() == 0 || StringUtils.isBlank(tableName)){
            return null;
        }
        String where = "";
        for(Column column : columns){
            if(column.getIsKey()){
                where = column.getName() + "=" + getValue(column);
                continue;
            }
        }
        String format = "DELETE FROM %s WHERE %s";
        return String.format(format,tableName,where);
    }

    private static String getValue(Column column){
        if(column.getIsNull()){
            return "null";
        }
        return String.format("'%s'",column.getValue());
    }

}

数据一致性

单机单点消费mysql的log-bin后直接更新到备份数据库中,数据一致性没有问题。但是如果变成分布式环境以及消费mysql的log-bin后将更新数据推到MQ中由多节点消费更新到多个备份数据库中,则会出现数据更新时序和数据一致性的问题。

而以上代码在update sql中除了获取值变化了的字段,也反查数据库获取了未变化的字段。因此每次update的sql实际上是该条记录的全量数据。

通过在表中加上时间戳字段作为记录的版本号,用逻辑删除取代物理删除delete,修改以上代码的sql拼接,insert操作时忽略主键冲突、update操作时仅更新版本号(时间戳)旧的记录,可以极大避免数据不一致的现象,也解决了MQ重复消费的问题。

`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

再通过定时任务,每天一次增量数据更新,每周一次全量数据更新,保证数据的最终一致性。


版权声明:本文为sz85850597原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。