60-spring 消费kafka, 推送消费到kafka

spring 消费kafka

消费者配置:

@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaConsumerConfig {

	@Value("${kafka.zdry.consumer.autoStartup}")
	private Boolean autoStartup;
	@Value("${kafka.zdry.consumer.servers}")
	private String servers;
	@Value("${kafka.zdry.consumer.topic}")
	private String topic;
	@Value("${kafka.zdry.consumer.group.id}")
	private String groupId;
	@Value("${kafka.zdry.consumer.enable.auto.commit}")
	private String enableAutoCommit;

	@Value("${kafka.zdry.consumer.auto.commit.interval.ms}")
	private String autoCommitIntervalMs;
	@Value("${kafka.zdry.consumer.session.timeout.ms}")
	private String sessionTimeoutMs;
	@Value("${kafka.zdry.consumer.auto.offset.reset}")
	private String autoOffsetReset;
	@Value("${kafka.zdry.consumer.max.poll.records}")
	private String maxPollRecords;
	@Value("${kafka.zdry.consumer.concurrency}")
	private Integer concurrency;


	/**
	 * 消费者批量工厂 人员轨迹
	 */
	@Bean("zdry_person_track")
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(concurrency);
		factory.getContainerProperties().setPollTimeout(1500);
		factory.setBatchListener(true);
		factory.setAutoStartup(autoStartup);
		return factory;
	}

	/**
	 * 消费者工厂
	 */
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	/**
	 * 消费者配置
	 */
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> propsMap = new HashMap<>();
		propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
		propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
		propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return propsMap;
	}

}

消费监听:

@KafkaListener(topics = "${kafka.zdry.consumer.topic}", containerFactory = "zdry_person_track")
    public void consumeToJson(List<ConsumerRecord<String, String>> records){
        int count = records.size();
        log.info("count={}",count);
        JSONArray arr = new JSONArray();
        for (ConsumerRecord<String, String> record : records) {
            JSONObject obj = JSON.parseObject(record.value());
            arr.add(obj);
        }

    }

推送消费到kafka

生产者kafka配置:

@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaProducerConfig {
    @Value("${kafka.zdry.consumer.servers}")
    private String servers;

    @Bean("zdryProducerTemplate")
    public KafkaTemplate<String, List<Object>> createTemplate(){
        Map<String, Object> pros = producerProps();
        ProducerFactory<String, List<Object>> pf = new DefaultKafkaProducerFactory<String, List<Object>>(pros);
        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
        return template;
    }

    public Map<String, Object> producerProps() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;

    }


}

产生消费并推送到kafka

    @Autowired
    private KafkaTemplate zdryProducerTemplate;

    @Value("${kafka.zdry.consumer.push.topic}")
    private String pushTopic;

方法:

try{
    zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn));
}catch(Exception e){
    e.printStackTrace();
}
zdryProducerTemplate.flush();

yml 配置

kafka:
  ## 重点人员抓拍轨迹
  zdry:
    consumer:
      autoStartup: true
      servers: x.x.x.x:6667
      topic: person_track
      group.id: aa_1
      enable.auto.commit: true
      auto.commit.interval.ms: 100
      session.timeout.ms: 10000
      auto.offset.reset: earliest
      max.poll.records: 100
      concurrency: 1
      push.topic: ai_warn

版权声明:本文为kuangfengbuyi原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。