kafka指定时间范围消费一批topic数据

public class JavaConsumerTool {
    /**
     * 创建消费者
     * @return
     */
    public static KafkaConsumer<String, String> getConsumer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        return consumer;
    }

    /**
     * 根据时间戳获取偏移量
     * @param consumer
     * @param topic
     * @param partition 分区号
     * @param datetimeStr 消息时间
     * @return
     * @throws ParseException
     */
    public static Long getOffsetByDateTime(KafkaConsumer consumer, String topic,int partition,String datetimeStr) throws ParseException {
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long timestamp = df.parse(datetimeStr).getTime();
        Map<TopicPartition,Long> map = new HashMap();
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        map.put(topicPartition,timestamp);
        Map<TopicPartition, OffsetAndTimestamp> offset = null;
        try {
            offset = consumer.offsetsForTimes(map,Duration.ofSeconds(10));
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
        return  offset.get(topicPartition).offset();
    }

    /**
     * 消费某时间范围内的一批数据
     * @param consumer
     * @param topic
     * @param partition 分区号
     * @param startTime 消费起始时间
     * @param endTime   消费结束时间
     */
    public static void consumerOnTimeBatch(KafkaConsumer<String, String> consumer, String topic, int partition, String startTime,String endTime){
        TopicPartition topicPartition = new TopicPartition(topic,partition);
        //指定主题分区
        consumer.assign(Arrays.asList(topicPartition));
        long startOffset = 0L;
        long endOffset = 0L;
        try {
            startOffset = getOffsetByDateTime(consumer,topic,partition,startTime);
            endOffset = getOffsetByDateTime(consumer,topic,partition,endTime);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        consumer.seek(topicPartition,startOffset);
        long offset = 0L;
        while (offset<=endOffset) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
            for (ConsumerRecord<String, String> record : records){
                offset = record.offset();
                System.out.println("时间:"+new Date(record.timestamp())+",偏移量:"+record.offset()+",消息体:"+record.value());
            }
        }
        consumer.close();
    }

    /*
    执行入口
     */
    public static void main(String[] args) throws Exception {
        KafkaConsumer<String, String> consumer = getConsumer();
        String topic = "test";
        int partition = 0;
        String startTime = "1997-01-17 00:00:00";
        String endTime = "1997-01-18 00:00:00";
        //消费某时间范围的一批主题数据
        consumerOnTimeBatch(consumer,topic,partition,startTime,endTime);
        
    }

}

各位有需要我帮忙定制开发的可微信联系我哦,费用15元,微信:z1224576376,添加时请备注“kafka”


版权声明:本文为qq_32068809原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。