Java 客户端访问 Kafka

Java 客户端访问 Kafka

1,依赖引入

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.4.1</version>
</dependency>

2,消息生产端代码

package com.example.kafkademo.producer;

import com.example.kafkademo.constant.Constant;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

/**
 * @author lwc
 */
public class ProducerClass {

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        //kafka地址,多个地址用逗号分割
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "81.69.233.xxx:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        while (true){
            String msg = "Hello," + new Random().nextInt(100);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(Constant.topic, msg);
            kafkaProducer.send(record);
            System.out.println("消息发送成功:" + msg);
            Thread.sleep(100);
        }
    }
}


消息生产端参数说明:

//消息持久化参数
props.put(ProducerConfig.ACKS_CONFIG, "1");

/**
0 :producer 不需要等到任何broker确认,就可以继续发送下一条消息,性能最高,容易丢失消息
1 :等待leader成功将数据存入log,但是不需要等待所有follower成功写入,就可以继续发送下一条消息,这种情况存在follower 没把消息成功写入,leader节点又挂掉了,这个时候会丢失消息
-1 或 all: 需要等待 min.insync.replicas (默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据,这是最强的数据保证,
一般是金融级别跟钱打交道的场景才会使用这种配置。
**/
//失败重试机制配置
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

/**
消息发送失败,默认100ms重试一次,重试可以保证消息的可靠性.
但缺点是可能会造成消息的重复发送,需要在消费端做好幂等性
**/
//设置发送消息的本地缓冲区,消息会先发送到本地缓冲区,提高消息发送性能,默认值是33554432 == 32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384)

/**
设置批量发送消息的大小,默认值 16384 == 16k 
本地缓存满了16k,kafka 会从本地缓存拉起消息,批量一起发到broker
**/
//设置满足10ms立刻发送消息,没必要要等本地缓存满了16k才能发送消息到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
//将发送的key序列化成字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//将发送的消息value从字符串序列成字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

3,消息消费端代码

package com.example.kafkademo.consumer;

import com.example.kafkademo.constant.Constant;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

/**
 * @author lwc
 */
public class ConSumerClass {

    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) {
        Properties props = new Properties();
        //kafka地址,多个地址用逗号分割
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "81.69.233.xxx:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
       	// 订阅消息
        kafkaConsumer.subscribe(Collections.singletonList(Constant.topic));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
                        record.topic(), record.offset(), record.value()));
            }
        }
    }
}

消息消费端参数说明:

//消息消费组
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
//消息自动提交offset,默认为true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//brober多久没感知到consumer的心跳,就会将其踢出消费组,对应消费的Partition也会重新分配给其他的consumer,默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll消息的条数,如果消费者的性能很高,这个参数能设大一些
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//两次poll都超过这个时间,brober 会将消费者踢出消费组并将Partition分配给其它的消费者
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

版权声明:本文为qq_44538738原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。