Debezium Spring Boot应用程序架构
上图为使用嵌入式Debezium Spring Boot数据库连接器执行CDC的基本工作流架构。
SpringBoot通过pom文件引入jar
<properties>
<debezium.version>1.5.2.Final</debezium.version>
</properties>
......
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
修改配置文件
#Debezium数据同步配置
timely:
# 是否开启
switch: true
# 偏移量文件
offset-file-name: /data/mysql/sync/offsets.dat
# offset-file-name: C:\Users\cmg\Desktop\offsets.dat
# 是否启东时清除偏移量文件
offset-file-clean: true
# 偏移量提交时间 单位ms
offset-time: 1
# 读取历史记录文件
history-file-name: /data/mysql/own/datadir/mysql-bin.index
# history-file-name: C:\Users\cmg\Desktop\mysql-bin.index
# 读取的数据库信息
offline:
ip: 39.107.82.161
port: 3306
username: root
password: 88888888Stg
# 保证每个数据库读取的 instance-name logic-name 不能相同
# 实例名
instance-name: instance-name
# 逻辑名
logic-name: instance-name
# 读取的表
include-table: vehicle
# 读取的库
include-db: wk_crm
server-id: 1
代码案例
@Configuration
@Log4j2
public class ChangeEventConfig {
private final ChangeEventHandler changeEventHandler;
@Value("${timely.offset-file-name}")
private String offsetFileName;
@Value("${timely.offset-file-clean:true}")
private Boolean offsetFileDelete;
@Value("${timely.offset-time}")
private String offsetTime;
@Value("${timely.history-file-name}")
private String historyFileName;
@Value("${timely.offline.instance-name}")
private String instanceName;
@Value("${timely.offline.logic-name}")
private String logicName;
@Value("${timely.offline.ip}")
private String ip;
@Value("${timely.offline.port}")
private String port;
@Value("${timely.offline.username}")
private String username;
@Value("${timely.offline.password}")
private String password;
@Value("${timely.offline.include-table}")
private String includeTable;
@Value("${timely.offline.include-db}")
private String includeDb;
@Value("${timely.offline.server-id}")
private String serverId;
@Autowired
public ChangeEventConfig(ChangeEventHandler changeEventHandler) {
this.changeEventHandler = changeEventHandler;
}
@Bean
public void cleanFile() {
if (offsetFileDelete && FileUtil.exist(offsetFileName)) {
FileUtil.del(offsetFileName);
}
}
/**
* 实例化sql server 实时同步服务类,执行任务
*
* @param configuration
* @return
*/
@Bean
SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) {
SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor();
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine
.create(ChangeEventFormat.of(Connect.class))
.using(configuration.asProperties())
.notifying(changeEventHandler::handlePayload)
.build();
sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine);
return sqlServerTimelyExecutor;
}
/**
* @desc 同步执行服务类
*/
@Data
@Log4j2
public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle {
private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance();
private DebeziumEngine<?> debeziumEngine;
@Override
public void start() {
log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!");
executor.execute(debeziumEngine);
}
@SneakyThrows
@Override
public void stop() {
log.warn("debeziumEngine 监听实例关闭!");
debeziumEngine.close();
Thread.sleep(2000);
log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池关闭!");
executor.shutdown();
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void afterPropertiesSet() {
Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!");
}
public enum ThreadPoolEnum {
/**
* 实例
*/
INSTANCE;
public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool";
public static final String MySQL_LISTENER_POOL = "mysql-listener-pool";
/**
* 线程池单例
*/
private final ExecutorService es;
/**
* 枚举 (构造器默认为私有)
*/
ThreadPoolEnum() {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(MySQL_LISTENER_POOL + "-%d").build();
es = new ThreadPoolExecutor(8, 16, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(256),
threadFactory, new ThreadPoolExecutor.DiscardPolicy());
}
/**
* 公有方法
*
* @return ExecutorService
*/
public ExecutorService getInstance() {
return es;
}
}
}
}
变更数据处理实现类
@Service
@Log4j2
public class ChangeEventHandler {
public static final String DATA = "data";
public static final String BEFORE_DATA = "beforeData";
public static final String EVENT_TYPE = "eventType";
public static final String SOURCE = "source";
public static final String TABLE = "table";
private enum FilterJsonFieldEnum {
/**
* 表
*/
table,
/**
* 库
*/
db,
/**
* 操作时间
*/
ts_ms,
;
public static Boolean filterJsonField(String fieldName) {
return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName);
}
}
/**
* @desc 变更类型枚举
**/
public enum EventTypeEnum {
/**
* 增
*/
CREATE(1),
/**
* 改
*/
UPDATE(2),
/**
* 删
*/
DELETE(3),
;
@Getter
private final int type;
EventTypeEnum(int type) {
this.type = type;
}
}
public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) {
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue == null) {
continue;
}
// 获取变更表数据
Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue);
if (CollectionUtils.isEmpty(changeMap)) {
continue;
}
ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap);
if (changeListenerModel == null) {
continue;
}
String db = changeListenerModel.getDb();
String table = changeListenerModel.getTable();
Integer eventType = changeListenerModel.getEventType();
//wk_crm.wk_crm_customer 同步es
if ("wk_crm".equalsIgnoreCase(db) && "wk_crm_customer".equalsIgnoreCase(table)){
JSONObject jsonObject = JSONObject.parseObject(changeListenerModel.getData());
String customerId = jsonObject.getString("customer_id");
switch (eventType){
//增改
case 1:
case 2:
log.info("变更ES customerId:{}", customerId);
changeCustomerES(customerId);
break;
//删除
case 3:
log.info("删除ES customerId:{}", customerId);
deleteCustomerES(customerId);
break;
default:
continue;
}
}else {
continue;
}
String jsonString = JSON.toJSONString(changeListenerModel);
log.info("发送变更数据:{}", jsonString);
}
try {
recordCommitter.markBatchFinished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void changeCustomerES(String customerId) {
HttpRequest post = HttpUtil.createPost("https://xxxx/crmCustomer/addalles?customerIds=" + customerId);
HttpResponse execute = post.execute();
log.info("同步ES返回结果:{}", execute.body());
}
private void deleteCustomerES(String customerId) {
}
private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) {
// 操作类型过滤,只处理增删改
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ) {
Integer eventType = null;
Map<String, Object> result = new HashMap<>(4);
if (operation == Envelope.Operation.CREATE) {
eventType = EventTypeEnum.CREATE.getType();
result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER));
result.put(BEFORE_DATA, null);
}
// 修改需要特殊处理,拿到前后的数据
if (operation == Envelope.Operation.UPDATE) {
if (!changeMap.containsKey(TABLE)) {
return null;
}
eventType = EventTypeEnum.UPDATE.getType();
String currentTableName = String.valueOf(changeMap.get(TABLE).toString());
// 忽略非重要属性变更
Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName);
if (CollectionUtils.isEmpty(resultMap)) {
return null;
}
result.put(DATA, resultMap.get(AFTER));
result.put(BEFORE_DATA, resultMap.get(BEFORE));
}
if (operation == Envelope.Operation.DELETE) {
eventType = EventTypeEnum.DELETE.getType();
result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE));
result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE));
}
result.put(EVENT_TYPE, eventType);
result.putAll(changeMap);
return BeanUtil.copyProperties(result, ChangeListenerModel.class);
}
return null;
}
/**
* 过滤非重要变更数据
*
* @param sourceRecordChangeValue
* @param currentTableName
* @return
*/
private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) {
Map<String, String> resultMap = new HashMap<>(4);
Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER);
Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE);
//todo 根据表过滤字段
resultMap.put(AFTER, JSON.toJSONString(afterMap));
resultMap.put(BEFORE, JSON.toJSONString(beforeMap));
return resultMap;
}
/**
* 校验是否仅仅是非重要字段属性变更
* @param currentTableName
* @param afterMap
* @param beforeMap
* @param filterColumnList
* @return
*/
private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap,
Map<String, Object> beforeMap, List<String> filterColumnList) {
Map<String, Boolean> filterMap = new HashMap<>(16);
for (String key : afterMap.keySet()) {
Object afterValue = afterMap.get(key);
Object beforeValue = beforeMap.get(key);
filterMap.put(key, !Objects.equals(beforeValue, afterValue));
}
filterColumnList.parallelStream().forEach(filterMap::remove);
if (filterMap.values().stream().noneMatch(x -> x)) {
log.info("表:{}无核心资料变更,忽略此次操作!", currentTableName);
return true;
}
return false;
}
public String getChangeData(Struct sourceRecordChangeValue, String record) {
Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record);
if (CollectionUtils.isEmpty(changeDataMap)) {
return null;
}
return JSON.toJSONString(changeDataMap);
}
public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) {
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> changeData = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
if (CollectionUtils.isEmpty(changeData)) {
return null;
}
return changeData;
}
private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) {
Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE);
Map<String, Object> map = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName))
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) {
map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name()));
map.remove(FilterJsonFieldEnum.ts_ms.name());
}
return map;
}
}
部署方式
将构建好的jar包或容器,上传至 sourceTarget 目标服务器,并启动。
版权声明:本文为chen978616649原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。