python消费datahub_协同消费_开发指南_DataHub - 阿里云

协同消费

datahub-client-library是Java-SDK读写功能的上层封装,支持协同消费的功能的Consumer,以及对shard均匀写入的Producer。

概念

点位服务

点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。

为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。

在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。

协同消费

协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。

场景

现有3个消费者实例A,B,C,Topic共有10个shard

实例A启动,分配10个shard

实例B,C启动,shard分配为4,3,3

将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3

实例C停止后,shard分配为6,5

心跳

要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。

版本

Maven依赖以及JDK:

maven pom

com.aliyun.datahub

aliyun-sdk-datahub

2.17.1-public

com.aliyun.datahub

datahub-client-library

1.1.12-public

jdk

jdk:>=1.8

示例

Producer 代码示例importcom.aliyun.datahub.client.exception.AuthorizationFailureException;

importcom.aliyun.datahub.client.exception.DatahubClientException;

importcom.aliyun.datahub.client.exception.InvalidParameterException;

importcom.aliyun.datahub.client.exception.MalformedRecordException;

importcom.aliyun.datahub.client.exception.NoPermissionException;

importcom.aliyun.datahub.client.exception.ShardNotFoundException;

importcom.aliyun.datahub.client.model.Field;

importcom.aliyun.datahub.client.model.FieldType;

importcom.aliyun.datahub.client.model.RecordEntry;

importcom.aliyun.datahub.client.model.RecordSchema;

importcom.aliyun.datahub.client.model.TupleRecordData;

importcom.aliyun.datahub.clientlibrary.config.ProducerConfig;

importcom.aliyun.datahub.clientlibrary.producer.Producer;

importcom.aliyun.datahub.exception.ResourceNotFoundException;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importjava.util.ArrayList;

importjava.util.List;

importjava.util.concurrent.TimeUnit;

publicclassDatahubWriter{

privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DatahubWriter.class);

privatestaticvoidsleep(longmilliSeconds){

try{

TimeUnit.MILLISECONDS.sleep(milliSeconds);

}catch(InterruptedExceptione){

// TODO:自行处理异常

}

}

privatestaticListgenRecords(RecordSchemaschema){

ListrecordEntries=newArrayList<>();

for(intcnt=0;cnt<10;++cnt){

RecordEntryentry=newRecordEntry();

entry.addAttribute("key1","value1");

entry.addAttribute("key2","value2");

TupleRecordDatadata=newTupleRecordData(schema);

data.setField("field1","testValue");

data.setField("field2",1);

entry.setRecordData(data);

recordEntries.add(entry);

}

returnrecordEntries;

}

privatestaticvoidsendRecords(Producerproducer,ListrecordEntries){

intmaxRetry=3;

while(true){

try{

// 自动选择shard写入

producer.send(recordEntries,maxRetry);

// 指定写入shard "0"

// producer.send(recordEntries, "0", maxRetry);

LOG.error("send records: {}",recordEntries.size());

break;

}catch(MalformedRecordExceptione){

// record 格式非法,根据业务场景选择忽略或直接抛异常

LOG.error("write fail",e);

throwe;

}catch(InvalidParameterException|

AuthorizationFailureException|

NoPermissionExceptione){

// 请求参数非法

// 签名不正确

// 没有权限

LOG.error("write fail",e);

throwe;

}catch(ShardNotFoundExceptione){

// shard 不存在, 如果不是写入自己指定的shard,可以不用处理

LOG.error("write fail",e);

sleep(1000);

}catch(ResourceNotFoundExceptione){

// project, topic 或 shard 不存在

LOG.error("write fail",e);

throwe;

}catch(DatahubClientExceptione){

// 基类异常,包含网络问题等,可以选择重试

LOG.error("write fail",e);

sleep(1000);

}

}

}

publicstaticvoidmain(String[]args){

// Endpoint以Region: 华东1为例,其他Region请按实际情况填写

Stringendpoint="http://dh-cn-hangzhou.aliyuncs.com";

StringaccessId="";

StringaccessKey="";

StringprojectName="";

StringtopicName="";

RecordSchemaschema=datahubClient.getTopic(projectName,topicName).getRecordSchema();

ProducerConfigconfig=newProducerConfig(endpoint,accessId,accessKey);

Producerproducer=newProducer(projectName,topicName,config);

// 根据场景控制循环

booleanstop=false;

try{

while(!stop){

ListrecordEntries=genRecords(schema);

sendRecords(producer,recordEntries);

}

}finally{

// 确保资源正确释放

producer.close();

}

}

}

初始化Consumer

配置名称

描述

autoCommit

是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。

offsetCommitTimeoutMs

点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]

sessionTimeoutMs

会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]

fetchSize

单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0

// Endpoint以Region: 华东1为例,其他Region请按实际情况填写

Stringendpoint="http://dh-cn-hangzhou.aliyuncs.com";

StringaccessId="";

StringaccessKey="";

StringprojectName="";

StringtopicName="";

StringsubId="";

// 1. 使用协同消费,subId

ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);

Consumerconsumer=newConsumer(projectName,topicName,subId,config);

// 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表

Listassignment=Arrays.asList("0","1","2");

ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);

Consumerconsumer=newConsumer(projectName,topicName,subId,assignment,config);

// 3. 不使用协同消费,不使用点位服务记录的点位,提供subId,Consumer读取的shard和初始点位

MapoffsetMap=newHashMap<>();

// 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor

offsetMap.put("0",newOffset(100,1548573440756L));

// 只提供sequence,按照sequence获取Cursor

offsetMap.put("1",newOffset().setSequence(1));

// 只提供timestamp,按照timestamp获取Cursor

offsetMap.put("2",newOffset().setTimestamp(1548573440756L));

ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);

Consumerconsumer=newConsumer(projectName,topicName,subId,offsetMap,config);

协同代码示例importcom.aliyun.datahub.client.exception.AuthorizationFailureException;

importcom.aliyun.datahub.client.exception.DatahubClientException;

importcom.aliyun.datahub.client.exception.InvalidParameterException;

importcom.aliyun.datahub.client.exception.NoPermissionException;

importcom.aliyun.datahub.client.exception.SubscriptionOfflineException;

importcom.aliyun.datahub.client.exception.SubscriptionOffsetResetException;

importcom.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;

importcom.aliyun.datahub.client.model.RecordEntry;

importcom.aliyun.datahub.client.model.TupleRecordData;

importcom.aliyun.datahub.clientlibrary.config.ConsumerConfig;

importcom.aliyun.datahub.clientlibrary.consumer.Consumer;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importjava.util.concurrent.TimeUnit;

publicclassDatahubReader{

privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DatahubReader.class);

privatestaticvoidsleep(longmilliSeconds){

try{

TimeUnit.MILLISECONDS.sleep(milliSeconds);

}catch(InterruptedExceptione){

// TODO:自行处理异常

}

}

publicstaticConsumercreateConsumer(ConsumerConfigconfig,Stringproject,Stringtopic,StringsubId)

{

returnnewConsumer(project,topic,subId,config);

}

publicstaticvoidmain(String[]args){

Stringendpoint="http://dh-cn-hangzhou.aliyuncs.com";

StringaccessId="";

StringaccessKey="";

StringprojectName="";

StringtopicName="";

StringsubId="";

ConsumerConfigconfig=newConsumerConfig(endpoint,accessId,accessKey);

Consumerconsumer=createConsumer(config,projectName,topicName,subId);

intmaxRetry=3;

booleanstop=false;

try{

while(!stop){

try{

while(true){

// 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null

// 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack

RecordEntryrecord=consumer.read(maxRetry);

// 处理数据

if(record!=null){

TupleRecordDatadata=(TupleRecordData)record.getRecordData();

// 根据自己的schema来处理数据,此处打印第一列的内容

LOG.info("field1: {}",data.getField(0));

// 根据列名取数据

// LOG.info("field2: {}", data.getField("field2"));

// 非自动提交模式,每条record处理完后都需要ack

// 自动提交模式,ack不会做任何操作

// 1.1.7版本及以上

record.getKey().ack();

}else{

LOG.info("read null");

}

}

}catch(SubscriptionOffsetResetExceptione){

// 点位被重置,重新初始化consumer

try{

consumer.close();

consumer=createConsumer(config,projectName,topicName,subId);

}catch(DatahubClientExceptione1){

// 初始化失败,重试或直接抛异常

LOG.error("create consumer failed",e);

throwe;

}

}catch(InvalidParameterException|

SubscriptionOfflineException|

SubscriptionSessionInvalidException|

AuthorizationFailureException|

NoPermissionExceptione){

// 请求参数非法

// 订阅被下线

// 订阅下相同shard被其他客户端占用

// 签名不正确

// 没有权限

LOG.error("read failed",e);

throwe;

}catch(DatahubClientExceptione){

// 基类异常,包含网络问题等,可以选择重试

LOG.error("read failed, retry",e);

sleep(1000);

}

}

}catch(Throwablee){

LOG.error("read failed",e);

}finally{

// 确保资源正确释放

// 会提交已ack的点位

consumer.close();

}

}

}

注意事项

Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。