事务
事务是Kafka 0.11开始引入的新特性。类似于数据库事务,只是这里的数据源是Kafka,kafka事务属性是指一系列的生产者生产消息和消费者提交offset
的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。
事务入门例子
生产者如下,发送两条消息,添加事物的意思就是要么同时发送成功,要么都不成功。
/**
* 多次生成消息类型的事物
*/
public class OnlyWriteProducer {
protected static final Logger logger = LoggerFactory.getLogger(OnlyWriteProducer.class);
public static final String TOPIC_NAME = "producer-0";
public static void main(String[] args) {
Producer<String, User> producer = new KafkaTransactionBuilder<String, User, byte[]>().buildProducer();
//初始化事物
producer.initTransactions();
//开始事物
producer.beginTransaction();
try{
User user = new User(101L,"kafka","serializer@kafka.com",1);
producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user.getId()), user));
User user2 = new User(102L,"netty","transaction@netty.com",0);
producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user2.getId()), user2));
//提交事物
producer.commitTransaction();
}catch(Exception e){
logger.error("kafka消息发送异常!",e);
//停止事物
producer.abortTransaction();
}
producer.close();
}
}
消费者如下,消费者没有使用事物,但是消费者采用的是手动提交:
public class TransactionConsumer {
protected static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
private static boolean isClose = false;
public static void main(String args[]){
KafkaTransactionBuilder<String, User, byte[]> builder = new KafkaTransactionBuilder<String, User, byte[]>();
Consumer<String, byte[]> consumer = builder.buildConsumer();
consumer.subscribe(Arrays.asList(OnlyWriteProducer.TOPIC_NAME));
try{
while (!isClose) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records)
System.out.printf("key = %s,offset=%s partition=%s value = %s%n", record.key(),record.offset(),record.partition(), new User(record.value()));
//手动提交
consumer.commitAsync();
}
}catch(Exception e){
logger.error("kafka消息消费异常!",e);
}
consumer.close();
}
}
生产者和消费者配置如下,要使用事物生产者必须配置transactional.id
,同时enable.idempotence
需要设置为true:
public class KafkaTransactionBuilder<T,P,C> extends KafkaBuilder<T,P,C>{
/**
* 构建生产者
*/
@Override
public KafkaProducer<T,P> buildProducer(){
return buildProducer("default-transaction");
}
public KafkaProducer<T,P> buildProducer(String transactional_id){
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("batch.size", default_batch_size);
props.put("buffer.memory", default_buffer_size);
props.put("key.serializer", default_serializer);
props.put("value.serializer", "com.yang.kafka.serialization.ProtobufSerializer");
// 设置事物ID
props.put("transactional.id", transactional_id);
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
props.put("linger.ms", 1);
return new KafkaProducer<>(props);
}
/**
* 构建消费者
*/
@Override
public KafkaConsumer<T,C> buildConsumer(){
return buildConsumer(default_group_id);
}
public KafkaConsumer<T,C> buildConsumer(String gourpId){
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", gourpId);
props.put("key.deserializer", default_deserializer);
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// 设置隔离级别
props.put("isolation.level", "read_committed");
// 关闭自动提交
props.put("enable.auto.commit", false);
return new KafkaConsumer<>(props);
}
}
KafkaBuilder:
public class KafkaBuilder<T,P,C> {
protected static final String servers = "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092";
protected static final String default_serializer = "org.apache.kafka.common.serialization.StringSerializer";
protected static final String default_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
protected static final int default_buffer_size = 33554432;
protected static final int default_batch_size = 16384;
protected static final String default_group_id = "test";
public Producer<T,P> buildProducer(){
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", default_batch_size);
props.put("linger.ms", 1);
props.put("buffer.memory", default_buffer_size);
props.put("key.serializer", default_serializer);
props.put("value.serializer", default_serializer);
return new KafkaProducer<>(props);
}
public Consumer<T,C> buildConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", default_group_id);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", default_deserializer);
props.put("value.deserializer", default_deserializer);
return new KafkaConsumer<>(props);
}
}
简单的来说,kafka事物分为三种,我个人认为有意义的只有1,3两种:
- 在一个事物中,多次的发送消息
- 在一个事物中,多次的消费消息(其实这个没什么意义)
- 在一个事物中,即消费消息,又生产消息。这时候分两种情况,一种是消费再生产,这种模式我们称为
consume-transform-produce
,另一种是先生产再消费,其实这种没有任何意义,结合实际情况想一想就能明白,所以我们常说的,即消费消息,又生产消息,指的是consume-transform-produce
。
consume-transform-produce模式
这里实现了一个这样的例子:接收到用户信息(user),但是用户信息是不带email的,所以,我们这里将接收到的用户信息添加email之后,在发送到另一个topic中。
public class ConsumeTransformProduce {
protected static final Logger logger = LoggerFactory.getLogger(ConsumeTransformProduce.class);
public static final String TOPIC_NAME = "producer-1";
public static final String GROUP_ID = "consume_transform_produce";
private static boolean isClose = false;
public static void main(String[] args) {
KafkaTransactionBuilder<String, User, byte[]> builder = new KafkaTransactionBuilder<String, User, byte[]>();
Consumer<String, byte[]> consumer = builder.buildConsumer(GROUP_ID);
Producer<String, User> producer = builder.buildProducer("producer-1-transaction");
// 初始化事物
producer.initTransactions();
/** 订阅producer-0 **/
consumer.subscribe(Arrays.asList(OnlyWriteProducer.TOPIC_NAME));
while (!isClose) {
// 开始事物
producer.beginTransaction();
try {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, byte[]> record : records){
User user = new User(record.value());
System.out.printf("key = %s,offset=%s partition=%s value = %s%n",record.key(), record.offset(), record.partition(),user);
commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));
user.setEmail("consume_transform_produce@kafka.com");
/** 发送producer-1 **/
producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user.getId()), user));
}
// 提交Offset
producer.sendOffsetsToTransaction(commits, GROUP_ID);
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
logger.error("kafka消息发送异常!", e);
// 停止事物
producer.abortTransaction();
}
}
producer.close();
}
}
版权声明:本文为u014801432原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。