Structured Streaming+Kafka 配置问题

使用Structured Streaming处理数据的知识积累。

1、Structured Streaming 在做读取数据的时候,format()首先会判断加载的数据源是什么(Kafka、MySQL ···),

2、读取Kafka数据的时候,KafkaSourceProvider 类中的createMicroBatchReader 函数体中有对option() 中设置的Kafka参数进行校验及设置默认参数,

 

 

对必填参数duisubscribe,startingOffsets,failOnDataLoss进行判断。

3、如果要对默认参数进行修改或者添加新的配置,通过 option("Kafka.xx","") 进行设置。由于在KafkaSourceProvider 类对Kafka设置的参数做了过滤处理 因此,设置Kafka参数时,必须以kafka. 前缀开始

切记!不是所有的Kafka参数都可以设置!

Kafka特定配置

Kafka自己的配置可以通过设置DataStreamReader.optionkafka.前缀,例如 stream.option("kafka.bootstrap.servers", "host:port")。有关可能的kafka参数,请参阅 Kafka使用者配置文档以获取与读取数据相关的参数,以及Kafka生产者配置文档 以获取与写入数据相关的参数。

请注意,无法设置以下Kafka参数,并且Kafka源或接收器将引发异常:

  • group.id:Kafka源将自动为每个查询创建一个唯一的组ID。
  • auto.offset.reset:设置源选项startingOffsets以指定从何处开始。结构化流管理在内部管理哪些偏移量,而不是依靠kafka使用者来执行此操作。这将确保在动态订阅新主题/分区时不会丢失任何数据。请注意,startingOffsets仅在启动新的流查询时适用,并且恢复将始终从查询中断的地方开始。
  • key.deserializer:始终使用ByteArrayDeserializer将键反序列化为字节数组。使用DataFrame操作显式反序列化键。
  • value.deserializer:始终使用ByteArrayDeserializer将值反序列化为字节数组。使用DataFrame操作显式反序列化值。
  • key.serializer:密钥始终使用ByteArraySerializer或StringSerializer进行序列化。使用DataFrame操作可以将键显式序列化为字符串或字节数组。
  • value.serializer:值始终使用ByteArraySerializer或StringSerializer进行序列化。使用DataFrame操作可以将值显式序列化为字符串或字节数组。
  • enable.auto.commit:Kafka源不提交任何偏移量。
  • Interceptor.classes:Kafka源始终将键和值读取为字节数组。使用ConsumerInterceptor是不安全的,因为它可能会中断查询。

 

 


版权声明:本文为Angel_Shura原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。