sparkStreaming 消费 kafka 一直显示offset提交失败 Offsetcommit failed

sparkStreaming 消费Kafka,提交任务后  一直无法提交offset

报错信息提示:增大session.timeout  或减小 max.poll.records 

session.timeout.ms : 在使用Kafka的团队管理设施时,用于检测消费者失败的超时时间。消费者定期发送心跳来向经纪人表明其活跃度。如果代理在该会话超时到期之前没有收到心跳,那么代理将从该组中删除该消费者并启动重新平衡。

 

max.poll.records : 在一次调用poll()中返回的最大记录数。

 

根据提示,设置kafka消费者配置参数:max.poll.records 为 100 ,重新提交spark任务仍然报此错误,

然后又尝试增大session.timeout,也无法解决。

 

将代码还原,重新提交,查看程序日志,发现Kafka消费者中配置参数max.poll.records=2147483647,没有max.poll.interval.ms这项参数

经过查询资料得知:0.10.0.1版本max.poll.records默认值为2147483647,且没有max.poll.interval.ms这项参数,如果消费者拿到的这些数据在制定时间内消费不完,就会手动提交失败,数据就会回滚到kafka中,会发生重复消费的情况。

查看pom文件,发现没有指定Kafka版本,只指定了spark-streaming-kafka-0-10_2.11版本,添加Kafka版本至pom中

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.1.0</version>
</dependency>

重新提交任务后不报此错误。

如果仍然会出现手动提交offset失败,考虑使用多线程处理数据。

 

1、 使用Kafka时,消费者每次poll的数据业务处理时间不能超过kafka的max.poll.interval.ms,该参数在kafka0.10.2.1中的默认值是300s,所以要综合业务处理时间和每次poll的数据数量。

2、Java线程池大小的选择,

对于CPU密集型应用,也就是计算密集型,线程池大小应该设置为CPU核数+1;

对于IO密集型应用 ,线程池大小设置为    2*CPU核数+1. 

 


版权声明:本文为qq_37575217原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。