Kafka 从指定Timestamp开始消费

思路

在这里插入图片描述

服务端设置

  • 修改server.properties 增加 log.message.timestamp.type=LogAppendTime

具体可参考
http://kafka.apache.org/0100/documentation.html#introduction

参考代码


public class ConsumerTimeStamp extends Thread {

    KafkaConsumer<Integer, String> consumer;

    public ConsumerTimeStamp() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "centos-6:9092");  //连接地址
        props.put("group.id", "lsy_test");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(props);
    }


    @Override
    public void run() {
        Map<TopicPartition, Long> map = new HashMap<>();
        List<PartitionInfo> flink_order = consumer.partitionsFor("flink_order");
        //从半小时前开始消费
        long fetchDataTime = new Date().getTime() - 1000 * 60 * 30;
        for (PartitionInfo par : flink_order) {
            map.put(new TopicPartition("flink_order", par.partition()), fetchDataTime);
        }
        Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            long offset = value.offset();
            System.out.println(key.partition());
            System.out.println(offset);
            //根据消费里的timestamp确定offset
            if (value != null) {
                //没有这行代码会导致下面的报错信息
                consumer.assign(Arrays.asList(key));
                consumer.seek(key, offset);
            }
        }
        while (true) {
            ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<Integer, String> record : poll) {
                System.out.println(record.key() + record.value());
            }
        }
    }

    public static void main(String[] args) {
        ConsumerTimeStamp consuer = new ConsumerTimeStamp();
        consuer.start();
    }
}


报错信息

Exception in thread "Thread-0" java.lang.IllegalStateException: No current assignment for partition flink_order-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
	at com.wending.demo.ConsumerTimeStamp.run(ConsumerTimeStamp.java:54)

版权声明:本文为qq_22222499原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。