
Kafka
拉取的数据库 Binlog
》这篇文章中,笔者介绍如何实现并行消费 Binlog
,具体实现就是让同一张表的 Binlog
放到同一个线程去消费,用局部顺序消费换取消费速度,避免消息堆积。但在某些场景下,使用这种方式还是出现了问题,出现在关联表更新数据同步的先后顺序上。在此分享下我们的解决方案:新增分组概念,将关联表放到同一分组,将同一分组的 Binlog
分配到同一线程消费。限制一个表只能分配到一个分组下,如果没有为表配置分组,则该表就是一个独立的分组,分组名称就是表名称。在一次调试过程中笔者发现,日记打印的
Binlog
显示某些字段更新之前和更新之后都有值,可是到消费的时候获取字段的值却是 null
,如下图所示。
Binlog
修改的字段值,而此次代码只是加了一条日记打印。添加打印消费到的每条 Binlog
记录的详细信息之后,消费就不正常了,并且现象很奇怪,数值类型与日期类型的字段依然能正常获取到值,只是字符串类型的字段获取不到值了。排查此问题的思路:从 Kafka
拉取到消息到实际消费,这期间都做了什么,导致获取字段值为 null
?使用 kafka
拉取 Binlog
需要使用 Avro
反序列化消息,消息反序列化后生成 com.alibaba.dts.formats.avro.Record
对象,该对象记录了 Binlog
的操作类型、操作的库和表、字段、字段修改之前的镜像值与修改之后的镜像值。从 Record
对象获取字段的镜像值类型为 "com.alibaba.dts.formats.avro"
包下对应的类型,这些类型都提供有 getValue
方法获取值,但不同类型 getValue
方法返回值类型不同。com.alibaba.dts.formats.avro.Integer
的getValue
方法返回值类型为java.lang.String
;com.alibaba.dts.formats.avro.Float
的getValue
方法返回值类型为java.lang.Double
;...
getAsString
、 getAsInteger
、 getAsFloat
之类的 API
,不必再关心数据库该字段的值是什么类型,只需要关心我想要获取的值应该是什么类型。例如,将字段的镜像值都解析为 FieldValue
实例, FieldValue
类定义如下:public class FieldValue {private String encoding;private byte[] bytes;public String getAsString() {return new String(bytes, encoding);
}public Integer getAsInteger() {return Integer.parseInt(getAsString());
}public Long getAsLong() {return Long.parseLong(getAsString());
}public BigDecimal getAsBigDecimal() {return new BigDecimal(getAsString());
}// ......
}
以将 com.alibaba.dts.formats.avro.Integer
类型的字段值解析为 FieldValue
对象为例,实现解析代码如下。static class NumberStringAdapter implements DataAdapter {@Overridepublic FieldValue getFieldValue(Object data) {
FieldValue fieldValue = new FieldValue();if (null != data) {
com.alibaba.dts.formats.avro.Integer integer = (com.alibaba.dts.formats.avro.Integer) data;// 调用getValue获取字符串数值,并将字符串转为字节数组
fieldValue.setValue(integer.getValue().getBytes(US_ASCII));
}
fieldValue.setEncoding("ASCII");return fieldValue;
}
}
为便于使用,我们还可以将字段、字段修改之前的镜像值、字段修改之后的镜像值解析为一个个 FieldHolder
实例, FieldHolder
的定义如下。public abstract class FieldHolder {protected Field field;// 当操作为插入时,此字段没有值protected FieldValue beforeImage;// 当操作为删除时,此操作没有值protected FieldValue afterImage;// 省略get方法public FieldHolder(Field field, Object beforeImage, Object afterImage) {//....
}// 比较该字段的值是否修改了public boolean isModify() {if (beforeImage == null && afterImage == null) {return false;
}if (beforeImage == null) {return true;
}if (afterImage == null) {return true;
}return !getBeforeFieldValue().equals(getAfterFieldValue());
}
}
以 Integer
、 Float
( com.alibaba.dts.formats.avro
包下的类)不同的是, Character
、 BinaryObject
的 getValue
方法返回值类型为 java.nio.ByteBuffer
。将 Character
类型的字段值解析为 FieldValue
对象的实现代码如下。static class CharacterAdapter implements DataAdapter {@Overridepublic FieldValue getFieldValue(Object data) {
FieldValue fieldValue = new FieldValue();if (null != data) {
com.alibaba.dts.formats.avro.Character character = (com.alibaba.dts.formats.avro.Character) data;
ByteBuffer buffer = character.getValue();byte[] ret = new byte[buffer.remaining()];
buffer.get(ret);
fieldValue.setValue(ret);
fieldValue.setEncoding(character.getCharset());
} else {
fieldValue.setEncoding("ASCII");
}return fieldValue;
}
}
可以看出,产生此次 bug
的原因在于,调用 ByteBuffer
的 get
方法读取数据后, ByteBuffer
的读偏移量( position
)等于 limit
,由于没有调用 flip
重置读指针为 0
,导致后续再调用 getFieldValue
解析同一个镜像值时,解析后的 FieldValue
对象的 bytes
字段值是空的。应将代码改为如下:static class CharacterAdapter implements DataAdapter {@Overridepublic FieldValue getFieldValue(Object data) {
FieldValue fieldValue = new FieldValue();if (null != data) {
com.alibaba.dts.formats.avro.Character character = (com.alibaba.dts.formats.avro.Character) data;
ByteBuffer buffer = character.getValue();byte[] ret = new byte[buffer.remaining()];
buffer.get(ret);
fieldValue.setValue(ret);
fieldValue.setEncoding(character.getCharset());
character.getValue().flip();
} else {
fieldValue.setEncoding("ASCII");
}return fieldValue;
}
}
当然,我们可以让整个 Record
记录只解析一次,后续就不会出现反复读取字段值的情况。定义 Record
解析器接口:public interface RecordResolver<T extends FieldHolder> {String getDdl();String getDatabase();String getTable();Operation getOperation();FieldHolderMapgetFields();
}
根据 Record
对象创建 Record
解析器,在解析器构造方法中立即解析 Record
,外部每次调用 Record
解析器获取字段信息都是获取到已经解析好的。代码如下。public abstract class BaseRecordResolver<T extends FieldHolder> implements RecordResolver<T> {private String db;private String table;private String ddl;private Operation operation;private FieldHolderMap fieldHolderMap;public BaseRecordResolver(Record record) {this.operation = record.getOperation();
String[] dbPair = uncompressionObjectName(record.getObjectName());if (null != dbPair) {this.db = dbPair[0];if (dbPair.length == 2) {
table = dbPair[1];
} else if (dbPair.length == 3) {
table = dbPair[2];
} else if (dbPair.length == 1) {
table = "";
}
}if (record.getOperation() == Operation.DDL) {
ddl = (String) record.getAfterImages();
} else if (record.getFields() != null) {this.fieldHolderMap = readFieldInfo(record);
}
}private FieldHolderMapreadFieldInfo(Record record) {
Iterator fields = ((List) record.getFields()).iterator();
Iterator beforeImages = null;
Iterator afterImages = null;// update操作没有BeforeImagesif (record.getOperation() == Operation.UPDATE || record.getOperation() == Operation.DELETE) {
beforeImages = ((List) record.getBeforeImages()).iterator();
}// delete操作没有AfterImagesif (record.getOperation() == Operation.INSERT || record.getOperation() == Operation.UPDATE) {
afterImages = ((List) record.getAfterImages()).iterator();
}
List fieldHolders = new ArrayList<>(((List) record.getFields()).size());while (fields.hasNext()
&& (beforeImages == null || beforeImages.hasNext())
&& (afterImages == null || afterImages.hasNext())) {
Field field = fields.next();
Object before = beforeImages == null ? null : beforeImages.next();
Object after = afterImages == null ? null : afterImages.next();
fieldHolders.add(resolverField(field, before, after));
}return new FieldHolderMap<>(fieldHolders);
}@Overridepublic String getDdl() {if (operation == Operation.DDL) {return this.ddl;
}throw new IllegalArgumentException("not found ddl error");
}@Overridepublic String getDatabase() {return this.db;
}@Overridepublic String getTable() {return this.table;
}@Overridepublic Operation getOperation() {return this.operation;
}@Overridepublic FieldHolderMapgetFields() {if (getOperation() == Operation.DDL) {throw new IllegalArgumentException("ddl not fields");
}return this.fieldHolderMap;
}// protected abstract T resolverField(Field field, Object before, Object after);
}

版权声明:本文为weixin_39887748原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。