kafka2.0-事物发送(the transactional producer)_10

事务
事务是Kafka 0.11开始引入的新特性。类似于数据库事务,只是这里的数据源是Kafka,kafka事务属性是指一系列的生产者生产消息和消费者提交offset的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。

事务入门例子

生产者如下,发送两条消息,添加事物的意思就是要么同时发送成功,要么都不成功。

/**
 * 多次生成消息类型的事物
 */
public class OnlyWriteProducer {
    protected static final Logger logger = LoggerFactory.getLogger(OnlyWriteProducer.class);
    public static final String TOPIC_NAME = "producer-0"; 

    public static void main(String[] args) {
         Producer<String, User> producer = new KafkaTransactionBuilder<String, User, byte[]>().buildProducer();
         //初始化事物
         producer.initTransactions();
         //开始事物
         producer.beginTransaction();

         try{
             User user = new User(101L,"kafka","serializer@kafka.com",1);
             producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user.getId()), user));

             User user2 = new User(102L,"netty","transaction@netty.com",0);
             producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user2.getId()), user2));
             //提交事物
             producer.commitTransaction();
         }catch(Exception e){
             logger.error("kafka消息发送异常!",e);
             //停止事物
             producer.abortTransaction();
         }

         producer.close();
    }
}

消费者如下,消费者没有使用事物,但是消费者采用的是手动提交:

public class TransactionConsumer {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
    private static boolean isClose = false;


    public  static void main(String args[]){
        KafkaTransactionBuilder<String, User, byte[]> builder = new KafkaTransactionBuilder<String, User, byte[]>();
        Consumer<String, byte[]> consumer = builder.buildConsumer();

        consumer.subscribe(Arrays.asList(OnlyWriteProducer.TOPIC_NAME));
        try{
            while (!isClose) {
                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, byte[]> record : records)
                    System.out.printf("key = %s,offset=%s partition=%s value = %s%n", record.key(),record.offset(),record.partition(), new User(record.value()));

                //手动提交
                consumer.commitAsync();
            }   

        }catch(Exception e){
            logger.error("kafka消息消费异常!",e);
        }
        consumer.close();
    }
}

生产者和消费者配置如下,要使用事物生产者必须配置transactional.id,同时enable.idempotence需要设置为true:

public class KafkaTransactionBuilder<T,P,C> extends KafkaBuilder<T,P,C>{

    /**
     * 构建生产者
     */
    @Override
    public KafkaProducer<T,P> buildProducer(){
        return buildProducer("default-transaction");
    }

    public KafkaProducer<T,P> buildProducer(String transactional_id){
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("batch.size", default_batch_size);
        props.put("buffer.memory", default_buffer_size);

        props.put("key.serializer", default_serializer);
        props.put("value.serializer", "com.yang.kafka.serialization.ProtobufSerializer");
        // 设置事物ID
        props.put("transactional.id", transactional_id);

        props.put("acks", "all");
        props.put("retries", 3);
        props.put("enable.idempotence", true);
        props.put("linger.ms", 1);
        return new KafkaProducer<>(props);
    }


    /**
     * 构建消费者
     */
    @Override
    public KafkaConsumer<T,C> buildConsumer(){
        return buildConsumer(default_group_id);
    }

    public KafkaConsumer<T,C> buildConsumer(String gourpId){
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", gourpId);
        props.put("key.deserializer", default_deserializer);
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        // 设置隔离级别
        props.put("isolation.level", "read_committed");
        // 关闭自动提交
        props.put("enable.auto.commit", false);

        return new KafkaConsumer<>(props);
    }
}

KafkaBuilder:

public class KafkaBuilder<T,P,C> {
    protected static final String servers = "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092";

    protected static final String default_serializer = "org.apache.kafka.common.serialization.StringSerializer";
    protected static final String default_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";

    protected static final int default_buffer_size = 33554432;
    protected static final int default_batch_size = 16384;
    protected static final String default_group_id = "test";

    public Producer<T,P> buildProducer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", default_batch_size);
        props.put("linger.ms", 1);
        props.put("buffer.memory", default_buffer_size);
        props.put("key.serializer", default_serializer);
        props.put("value.serializer", default_serializer);
        return new KafkaProducer<>(props);
    }

    public Consumer<T,C> buildConsumer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", default_group_id);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", default_deserializer);
        props.put("value.deserializer", default_deserializer);
        return new KafkaConsumer<>(props);
    }

}

简单的来说,kafka事物分为三种,我个人认为有意义的只有1,3两种:

  1. 在一个事物中,多次的发送消息
  2. 在一个事物中,多次的消费消息(其实这个没什么意义)
  3. 在一个事物中,即消费消息,又生产消息。这时候分两种情况,一种是消费再生产,这种模式我们称为consume-transform-produce,另一种是先生产再消费,其实这种没有任何意义,结合实际情况想一想就能明白,所以我们常说的,即消费消息,又生产消息,指的是consume-transform-produce

consume-transform-produce模式
这里实现了一个这样的例子:接收到用户信息(user),但是用户信息是不带email的,所以,我们这里将接收到的用户信息添加email之后,在发送到另一个topic中。

public class ConsumeTransformProduce {
    protected static final Logger logger = LoggerFactory.getLogger(ConsumeTransformProduce.class);

    public static final String TOPIC_NAME = "producer-1"; 
    public static final String GROUP_ID = "consume_transform_produce"; 

    private static boolean isClose = false;

    public static void main(String[] args) {
        KafkaTransactionBuilder<String, User, byte[]> builder = new KafkaTransactionBuilder<String, User, byte[]>();
        Consumer<String, byte[]> consumer = builder.buildConsumer(GROUP_ID);
        Producer<String, User> producer = builder.buildProducer("producer-1-transaction");
        // 初始化事物
        producer.initTransactions();

        /** 订阅producer-0 **/
        consumer.subscribe(Arrays.asList(OnlyWriteProducer.TOPIC_NAME));

        while (!isClose) {
            // 开始事物
            producer.beginTransaction();

            try {
                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));

                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
                for (ConsumerRecord<String, byte[]> record : records){
                    User user = new User(record.value());
                    System.out.printf("key = %s,offset=%s partition=%s value = %s%n",record.key(), record.offset(), record.partition(),user);

                    commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));

                    user.setEmail("consume_transform_produce@kafka.com");
                    /** 发送producer-1 **/
                    producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user.getId()), user));
                }
                // 提交Offset
                producer.sendOffsetsToTransaction(commits, GROUP_ID);
                // 提交事务
                producer.commitTransaction();
            } catch (Exception e) {
                logger.error("kafka消息发送异常!", e);
                // 停止事物
                producer.abortTransaction();
            }
        }

        producer.close();
    }
}

示例源码:https://github.com/Mryangtaofang/sample


版权声明:本文为u014801432原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。