协同消费
datahub-client-library是Java-SDK读写功能的上层封装,支持协同消费的功能的Consumer,以及对shard均匀写入的Producer。
概念
点位服务
点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。
为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。
在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。
协同消费
协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。
场景
现有3个消费者实例A,B,C,Topic共有10个shard
实例A启动,分配10个shard
实例B,C启动,shard分配为4,3,3
将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3
实例C停止后,shard分配为6,5
心跳
要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。
版本
Maven依赖以及JDK:
maven pom
com.aliyun.datahub
aliyun-sdk-datahub
2.17.1-public
com.aliyun.datahub
datahub-client-library
1.1.12-public
jdk
jdk:>=1.8
示例
Producer 代码示例importcom.aliyun.datahub.client.exception.AuthorizationFailureException;
importcom.aliyun.datahub.client.exception.DatahubClientException;
importcom.aliyun.datahub.client.exception.InvalidParameterException;
importcom.aliyun.datahub.client.exception.MalformedRecordException;
importcom.aliyun.datahub.client.exception.NoPermissionException;
importcom.aliyun.datahub.client.exception.ShardNotFoundException;
importcom.aliyun.datahub.client.model.Field;
importcom.aliyun.datahub.client.model.FieldType;
importcom.aliyun.datahub.client.model.RecordEntry;
importcom.aliyun.datahub.client.model.RecordSchema;
importcom.aliyun.datahub.client.model.TupleRecordData;
importcom.aliyun.datahub.clientlibrary.config.ProducerConfig;
importcom.aliyun.datahub.clientlibrary.producer.Producer;
importcom.aliyun.datahub.exception.ResourceNotFoundException;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.TimeUnit;
publicclassDatahubWriter{
privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DatahubWriter.class);
privatestaticvoidsleep(longmilliSeconds){
try{
TimeUnit.MILLISECONDS.sleep(milliSeconds);
}catch(InterruptedExceptione){
// TODO:自行处理异常
}
}
privatestaticListgenRecords(RecordSchemaschema){
ListrecordEntries=newArrayList<>();
for(intcnt=0;cnt<10;++cnt){
RecordEntryentry=newRecordEntry();
entry.addAttribute("key1","value1");
entry.addAttribute("key2","value2");
TupleRecordDatadata=newTupleRecordData(schema);
data.setField("field1","testValue");
data.setField("field2",1);
entry.setRecordData(data);
recordEntries.add(entry);
}
returnrecordEntries;
}
privatestaticvoidsendRecords(Producerproducer,ListrecordEntries){
intmaxRetry=3;
while(true){
try{
// 自动选择shard写入
producer.send(recordEntries,maxRetry);
// 指定写入shard "0"
// producer.send(recordEntries, "0", maxRetry);
LOG.error("send records: {}",recordEntries.size());
break;
}catch(MalformedRecordExceptione){
// record 格式非法,根据业务场景选择忽略或直接抛异常
LOG.error("write fail",e);
throwe;
}catch(InvalidParameterException|
AuthorizationFailureException|
NoPermissionExceptione){
// 请求参数非法
// 签名不正确
// 没有权限
LOG.error("write fail",e);
throwe;
}catch(ShardNotFoundExceptione){
// shard 不存在, 如果不是写入自己指定的shard,可以不用处理
LOG.error("write fail",e);
sleep(1000);
}catch(ResourceNotFoundExceptione){
// project, topic 或 shard 不存在
LOG.error("write fail",e);
throwe;
}catch(DatahubClientExceptione){
// 基类异常,包含网络问题等,可以选择重试
LOG.error("write fail",e);
sleep(1000);
}
}
}
publicstaticvoidmain(String[]args){
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
Stringendpoint="http://dh-cn-hangzhou.aliyuncs.com";
StringaccessId="";
StringaccessKey="";
StringprojectName="";
StringtopicName="";
RecordSchemaschema=datahubClient.getTopic(projectName,topicName).getRecordSchema();
ProducerConfigconfig=newProducerConfig(endpoint,accessId,accessKey);
Producerproducer=newProducer(projectName,topicName,config);
// 根据场景控制循环
booleanstop=false;
try{
while(!stop){
ListrecordEntries=genRecords(schema);
sendRecords(producer,recordEntries);
}
}finally{
// 确保资源正确释放
producer.close();
}
}
}
初始化Consumer
配置名称
描述
autoCommit
是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。
offsetCommitTimeoutMs
点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]
sessionTimeoutMs
会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]
fetchSize
单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
Stringendpoint="http://dh-cn-hangzhou.aliyuncs.com";
StringaccessId="";
StringaccessKey="";
StringprojectName="";
StringtopicName="";
StringsubId="";
// 1. 使用协同消费,subId
ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);
Consumerconsumer=newConsumer(projectName,topicName,subId,config);
// 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表
Listassignment=Arrays.asList("0","1","2");
ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);
Consumerconsumer=newConsumer(projectName,topicName,subId,assignment,config);
// 3. 不使用协同消费,不使用点位服务记录的点位,提供subId,Consumer读取的shard和初始点位
MapoffsetMap=newHashMap<>();
// 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
offsetMap.put("0",newOffset(100,1548573440756L));
// 只提供sequence,按照sequence获取Cursor
offsetMap.put("1",newOffset().setSequence(1));
// 只提供timestamp,按照timestamp获取Cursor
offsetMap.put("2",newOffset().setTimestamp(1548573440756L));
ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);
Consumerconsumer=newConsumer(projectName,topicName,subId,offsetMap,config);
协同代码示例importcom.aliyun.datahub.client.exception.AuthorizationFailureException;
importcom.aliyun.datahub.client.exception.DatahubClientException;
importcom.aliyun.datahub.client.exception.InvalidParameterException;
importcom.aliyun.datahub.client.exception.NoPermissionException;
importcom.aliyun.datahub.client.exception.SubscriptionOfflineException;
importcom.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
importcom.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
importcom.aliyun.datahub.client.model.RecordEntry;
importcom.aliyun.datahub.client.model.TupleRecordData;
importcom.aliyun.datahub.clientlibrary.config.ConsumerConfig;
importcom.aliyun.datahub.clientlibrary.consumer.Consumer;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importjava.util.concurrent.TimeUnit;
publicclassDatahubReader{
privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DatahubReader.class);
privatestaticvoidsleep(longmilliSeconds){
try{
TimeUnit.MILLISECONDS.sleep(milliSeconds);
}catch(InterruptedExceptione){
// TODO:自行处理异常
}
}
publicstaticConsumercreateConsumer(ConsumerConfigconfig,Stringproject,Stringtopic,StringsubId)
{
returnnewConsumer(project,topic,subId,config);
}
publicstaticvoidmain(String[]args){
Stringendpoint="http://dh-cn-hangzhou.aliyuncs.com";
StringaccessId="";
StringaccessKey="";
StringprojectName="";
StringtopicName="";
StringsubId="";
ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);
Consumerconsumer=createConsumer(config,projectName,topicName,subId);
intmaxRetry=3;
booleanstop=false;
try{
while(!stop){
try{
while(true){
// 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null
// 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack
RecordEntryrecord=consumer.read(maxRetry);
// 处理数据
if(record!=null){
TupleRecordDatadata=(TupleRecordData)record.getRecordData();
// 根据自己的schema来处理数据,此处打印第一列的内容
LOG.info("field1: {}",data.getField(0));
// 根据列名取数据
// LOG.info("field2: {}", data.getField("field2"));
// 非自动提交模式,每条record处理完后都需要ack
// 自动提交模式,ack不会做任何操作
// 1.1.7版本及以上
record.getKey().ack();
}else{
LOG.info("read null");
}
}
}catch(SubscriptionOffsetResetExceptione){
// 点位被重置,重新初始化consumer
try{
consumer.close();
consumer=createConsumer(config,projectName,topicName,subId);
}catch(DatahubClientExceptione1){
// 初始化失败,重试或直接抛异常
LOG.error("create consumer failed",e);
throwe;
}
}catch(InvalidParameterException|
SubscriptionOfflineException|
SubscriptionSessionInvalidException|
AuthorizationFailureException|
NoPermissionExceptione){
// 请求参数非法
// 订阅被下线
// 订阅下相同shard被其他客户端占用
// 签名不正确
// 没有权限
LOG.error("read failed",e);
throwe;
}catch(DatahubClientExceptione){
// 基类异常,包含网络问题等,可以选择重试
LOG.error("read failed, retry",e);
sleep(1000);
}
}
}catch(Throwablee){
LOG.error("read failed",e);
}finally{
// 确保资源正确释放
// 会提交已ack的点位
consumer.close();
}
}
}
注意事项
Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。