提高并发能力
修改server.properties中num.partitions的配置。Partition的个数需要根据kafka所用的盘数决定,例如:每个节点上有两块盘被kafka使用,那么partition的个数设定为2*节点数。
修改server.properties中num.network.threads的配置。建议配置:8
修改server.properties中num.io.threads的配置。建议配置:8
优化磁盘利用率
- Kafka默认数据保留策略为7天,由于磁盘存储量高于80%后,磁盘I/O将会急剧下降,所以建议根据实际业务需求,缩短数据保留时间。
消息生产优化(producer.properties)
- 调整消息缓冲区大小:buffer.memory
- 调整batch大小:batch.size,建议调大
- 回执确认:acks,如对效率要求较高的情况下建议使用0
消息消费优化(consumer.properties)
- 增加消费并发:num.consumer.fetchers,建议8
KafkaProducer单例
KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)是一个用于向kafka集群发送数据的Java客户端。该Java客户端是线程安全的,多个线程可以共享同一个producer实例,而且这通常比在多个线程中每个线程创建一个实例速度要快些。
public class ProducerUtil {
private static KafkaProducer<String, String> kafkaProducer;
private final static Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
private ProducerUtil() {
}
private static class LazyHandler {
private static final ProducerUtil instance = new ProducerUtil();
}
public static final ProducerUtil getInstance() {
return LazyHandler.instance;
}
public void init() throws ExecutionException, InterruptedException {
if (kafkaProducer == null) {
Properties props = new Properties();
props.put("bootstrap.servers",ip);
//这个配置可以设定发送消息后是否需要Broker端返回确认
//0: 不需要进行确认,速度最快。存在丢失数据的风险。
//1: 仅需要Leader进行确认,不需要ISR进行确认。是一种效率和安全折中的方式。
//all: 需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR可能会缩小到仅包含一个Replica,所以设置参数为all并不能一定避免数据丢失。
props.put("acks", "1");
//重新发送消息次数,到达次数返回错误
props.put("retries", 0);
//Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
props.put("batch.size", 163840);
//Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
props.put("linger.ms", 1);
//在Producer端用来存放尚未发送出去的Message的缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.timeout.ms", "60000");
kafkaProducer = new KafkaProducer<>(props);
}
}
public void sendKakfaMessage(JSONArray jsonarray) throws ExecutionException, InterruptedException {
for (Object object : jsonarray) {
String json = object.toString();
String topic = "topic";
//通过时间做轮循,均匀分布设置的partition,提高效率。
int partition = (int) (System.currentTimeMillis() % 3);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, UUID.randomUUID().toString(), json);
kafkaProducer.send(record, (recordMetadata, e) -> {
if (e == null) {
logger.info("Send topic: " + topic + ", partition: " + recordMetadata.partition() + ", offset: " + recordMetadata.offset());
} else {
logger.error("Send topic: " + topic + " failed");
}
}).get();
}
}
}
注意:所有配置参数调整后,需重新启动kafka集群
版权声明:本文为sun123_123原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。