SpringBoot整合Debezium CDC同步数据至目标数据库

Debezium Spring Boot应用程序架构

 上图为使用嵌入式Debezium Spring Boot数据库连接器执行CDC的基本工作流架构。

SpringBoot通过pom文件引入jar

 <properties>
        <debezium.version>1.5.2.Final</debezium.version>
 </properties>


      ......  


        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

修改配置文件

#Debezium数据同步配置
timely:
  # 是否开启
  switch: true
  # 偏移量文件
  offset-file-name: /data/mysql/sync/offsets.dat
#  offset-file-name: C:\Users\cmg\Desktop\offsets.dat
  # 是否启东时清除偏移量文件
  offset-file-clean: true
  # 偏移量提交时间 单位ms
  offset-time: 1
  # 读取历史记录文件
  history-file-name: /data/mysql/own/datadir/mysql-bin.index
#  history-file-name: C:\Users\cmg\Desktop\mysql-bin.index
  # 读取的数据库信息
  offline:
    ip: 39.107.82.161
    port: 3306
    username: root
    password: 88888888Stg
    # 保证每个数据库读取的 instance-name  logic-name 不能相同
    # 实例名
    instance-name: instance-name
    # 逻辑名
    logic-name: instance-name
    # 读取的表
    include-table: vehicle
    # 读取的库
    include-db: wk_crm
    server-id: 1

代码案例

@Configuration
@Log4j2
public class ChangeEventConfig {
    private final ChangeEventHandler changeEventHandler;

    @Value("${timely.offset-file-name}")
    private String offsetFileName;
    @Value("${timely.offset-file-clean:true}")
    private Boolean offsetFileDelete;
    @Value("${timely.offset-time}")
    private String offsetTime;
    @Value("${timely.history-file-name}")
    private String historyFileName;
    @Value("${timely.offline.instance-name}")
    private String instanceName;
    @Value("${timely.offline.logic-name}")
    private String logicName;
    @Value("${timely.offline.ip}")
    private String ip;
    @Value("${timely.offline.port}")
    private String port;
    @Value("${timely.offline.username}")
    private String username;
    @Value("${timely.offline.password}")
    private String password;
    @Value("${timely.offline.include-table}")
    private String includeTable;
    @Value("${timely.offline.include-db}")
    private String includeDb;
    @Value("${timely.offline.server-id}")
    private String serverId;

    @Autowired
    public ChangeEventConfig(ChangeEventHandler changeEventHandler) {
        this.changeEventHandler = changeEventHandler;
    }

    @Bean
    public void cleanFile() {
        if (offsetFileDelete && FileUtil.exist(offsetFileName)) {
            FileUtil.del(offsetFileName);
        }
    }


    /**
     * 实例化sql server 实时同步服务类,执行任务
     *
     * @param configuration
     * @return
     */
    @Bean
    SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) {
        SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor();
        DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine
                .create(ChangeEventFormat.of(Connect.class))
                .using(configuration.asProperties())
                .notifying(changeEventHandler::handlePayload)
                .build();
        sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine);
        return sqlServerTimelyExecutor;
    }

    /**
     * @desc 同步执行服务类
     */
    @Data
    @Log4j2
    public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle {
        private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance();
        private DebeziumEngine<?> debeziumEngine;

        @Override
        public void start() {
            log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!");
            executor.execute(debeziumEngine);
        }

        @SneakyThrows
        @Override
        public void stop() {
            log.warn("debeziumEngine 监听实例关闭!");
            debeziumEngine.close();
            Thread.sleep(2000);
            log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池关闭!");
            executor.shutdown();
        }

        @Override
        public boolean isRunning() {
            return false;
        }

        @Override
        public void afterPropertiesSet() {
            Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!");
        }

        public enum ThreadPoolEnum {
            /**
             * 实例
             */
            INSTANCE;

            public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool";

            public static final String MySQL_LISTENER_POOL = "mysql-listener-pool";
            /**
             * 线程池单例
             */
            private final ExecutorService es;

            /**
             * 枚举 (构造器默认为私有)
             */
            ThreadPoolEnum() {
                final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(MySQL_LISTENER_POOL + "-%d").build();
                es = new ThreadPoolExecutor(8, 16, 60,
                        TimeUnit.SECONDS, new ArrayBlockingQueue<>(256),
                        threadFactory, new ThreadPoolExecutor.DiscardPolicy());
            }

            /**
             * 公有方法
             *
             * @return ExecutorService
             */
            public ExecutorService getInstance() {
                return es;
            }
        }
    }

}

变更数据处理实现类

@Service
@Log4j2
public class ChangeEventHandler {

    public static final String DATA = "data";
    public static final String BEFORE_DATA = "beforeData";
    public static final String EVENT_TYPE = "eventType";
    public static final String SOURCE = "source";
    public static final String TABLE = "table";

    private enum FilterJsonFieldEnum {
        /**
         * 表
         */
        table,
        /**
         * 库
         */
        db,
        /**
         * 操作时间
         */
        ts_ms,
        ;

        public static Boolean filterJsonField(String fieldName) {
            return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName);
        }
    }

    /**
     * @desc 变更类型枚举
     **/
    public enum EventTypeEnum {
        /**
         * 增
         */
        CREATE(1),
        /**
         * 改
         */
        UPDATE(2),
        /**
         * 删
         */
        DELETE(3),
        ;
        @Getter
        private final int type;

        EventTypeEnum(int type) {
            this.type = type;
        }
    }

    public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                              DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
        for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) {
            SourceRecord sourceRecord = r.record();
            Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
            if (sourceRecordChangeValue == null) {
                continue;
            }
            // 获取变更表数据
            Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue);
            if (CollectionUtils.isEmpty(changeMap)) {
                continue;
            }
            ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap);
            if (changeListenerModel == null) {
                continue;
            }
            String db = changeListenerModel.getDb();
            String table = changeListenerModel.getTable();
            Integer eventType = changeListenerModel.getEventType();
            //wk_crm.wk_crm_customer 同步es
            if ("wk_crm".equalsIgnoreCase(db) && "wk_crm_customer".equalsIgnoreCase(table)){
                JSONObject jsonObject = JSONObject.parseObject(changeListenerModel.getData());
                String customerId = jsonObject.getString("customer_id");

                switch (eventType){
                    //增改
                    case 1:
                    case 2:
                        log.info("变更ES customerId:{}", customerId);
                        changeCustomerES(customerId);
                        break;
                    //删除
                    case 3:
                        log.info("删除ES customerId:{}", customerId);
                        deleteCustomerES(customerId);
                        break;
                    default:
                        continue;
                }

            }else {
                continue;
            }

            String jsonString = JSON.toJSONString(changeListenerModel);
            log.info("发送变更数据:{}", jsonString);
        }
        try {
            recordCommitter.markBatchFinished();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void changeCustomerES(String customerId) {
        HttpRequest post = HttpUtil.createPost("https://xxxx/crmCustomer/addalles?customerIds=" + customerId);
        HttpResponse execute = post.execute();
        log.info("同步ES返回结果:{}", execute.body());
    }
    private void deleteCustomerES(String customerId) {

    }

    private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) {
        // 操作类型过滤,只处理增删改
        Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
        if (operation != Envelope.Operation.READ) {
            Integer eventType = null;
            Map<String, Object> result = new HashMap<>(4);
            if (operation == Envelope.Operation.CREATE) {
                eventType = EventTypeEnum.CREATE.getType();
                result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER));
                result.put(BEFORE_DATA, null);
            }
            // 修改需要特殊处理,拿到前后的数据
            if (operation == Envelope.Operation.UPDATE) {
                if (!changeMap.containsKey(TABLE)) {
                    return null;
                }
                eventType = EventTypeEnum.UPDATE.getType();
                String currentTableName = String.valueOf(changeMap.get(TABLE).toString());
                // 忽略非重要属性变更
                Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName);
                if (CollectionUtils.isEmpty(resultMap)) {
                    return null;
                }
                result.put(DATA, resultMap.get(AFTER));
                result.put(BEFORE_DATA, resultMap.get(BEFORE));
            }
            if (operation == Envelope.Operation.DELETE) {
                eventType = EventTypeEnum.DELETE.getType();
                result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE));
                result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE));
            }
            result.put(EVENT_TYPE, eventType);
            result.putAll(changeMap);
            return BeanUtil.copyProperties(result, ChangeListenerModel.class);
        }
        return null;
    }

    /**
     * 过滤非重要变更数据
     *
     * @param sourceRecordChangeValue
     * @param currentTableName
     * @return
     */
    private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) {
        Map<String, String> resultMap = new HashMap<>(4);
        Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER);
        Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE);
        //todo 根据表过滤字段
        resultMap.put(AFTER, JSON.toJSONString(afterMap));
        resultMap.put(BEFORE, JSON.toJSONString(beforeMap));
        return resultMap;
    }

    /**
     * 校验是否仅仅是非重要字段属性变更
     * @param currentTableName
     * @param afterMap
     * @param beforeMap
     * @param filterColumnList
     * @return
     */
    private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap,
                                          Map<String, Object> beforeMap, List<String> filterColumnList) {
        Map<String, Boolean> filterMap = new HashMap<>(16);
        for (String key : afterMap.keySet()) {
            Object afterValue = afterMap.get(key);
            Object beforeValue = beforeMap.get(key);
            filterMap.put(key, !Objects.equals(beforeValue, afterValue));
        }
        filterColumnList.parallelStream().forEach(filterMap::remove);
        if (filterMap.values().stream().noneMatch(x -> x)) {
            log.info("表:{}无核心资料变更,忽略此次操作!", currentTableName);
            return true;
        }
        return false;
    }


    public String getChangeData(Struct sourceRecordChangeValue, String record) {
        Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record);
        if (CollectionUtils.isEmpty(changeDataMap)) {
            return null;
        }
        return JSON.toJSONString(changeDataMap);
    }

    public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) {
        Struct struct = (Struct) sourceRecordChangeValue.get(record);
        // 将变更的行封装为Map
        Map<String, Object> changeData = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null)
                .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                .collect(toMap(Pair::getKey, Pair::getValue));
        if (CollectionUtils.isEmpty(changeData)) {
            return null;
        }
        return changeData;
    }

    private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) {
        Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE);
        Map<String, Object> map = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName))
                .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                .collect(toMap(Pair::getKey, Pair::getValue));
        if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) {
            map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name()));
            map.remove(FilterJsonFieldEnum.ts_ms.name());
        }
        return map;
    }

}

部署方式

将构建好的jar包或容器,上传至 sourceTarget 目标服务器,并启动。


版权声明:本文为chen978616649原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。