思路
服务端设置
- 修改server.properties 增加 log.message.timestamp.type=LogAppendTime
具体可参考
http://kafka.apache.org/0100/documentation.html#introduction
参考代码
public class ConsumerTimeStamp extends Thread {
KafkaConsumer<Integer, String> consumer;
public ConsumerTimeStamp() {
Properties props = new Properties();
props.put("bootstrap.servers", "centos-6:9092"); //连接地址
props.put("group.id", "lsy_test");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(props);
}
@Override
public void run() {
Map<TopicPartition, Long> map = new HashMap<>();
List<PartitionInfo> flink_order = consumer.partitionsFor("flink_order");
//从半小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 30;
for (PartitionInfo par : flink_order) {
map.put(new TopicPartition("flink_order", par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
long offset = value.offset();
System.out.println(key.partition());
System.out.println(offset);
//根据消费里的timestamp确定offset
if (value != null) {
//没有这行代码会导致下面的报错信息
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
while (true) {
ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<Integer, String> record : poll) {
System.out.println(record.key() + record.value());
}
}
}
public static void main(String[] args) {
ConsumerTimeStamp consuer = new ConsumerTimeStamp();
consuer.start();
}
}
报错信息
Exception in thread "Thread-0" java.lang.IllegalStateException: No current assignment for partition flink_order-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
at com.wending.demo.ConsumerTimeStamp.run(ConsumerTimeStamp.java:54)
版权声明:本文为qq_22222499原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。
