java查看kafka数据量_kafka查询指定时间数据的偏移量

KafkaOffsetSearch kot= newKafkaOffsetSearch();

TreeMap metadatas =kot.findLeader(seeds, port, topic);

Map map = new HashMap();

List offSetList = new ArrayList<>();for (Entryentry : metadatas.entrySet()) {int partition =entry.getKey();

String leadBroker=entry.getValue().leader().host();

String clientName= "Client_" + topic + "_" +partition;

SimpleConsumer consumer= new SimpleConsumer(leadBroker, port, 100000,64 * 1024, clientName);long readOffset =getLastOffset(consumer, topic, partition,

timestamp, clientName);

offSetList.add(readOffset);

map.put(partition+"", readOffset+"");

System.out.println(partition+":"+readOffset);if(consumer!=null)consumer.close();

}returnmap;

}public static longgetLastOffset(SimpleConsumer consumer, String topic,int partition, longwhichTime, String clientName) {

TopicAndPartition topicAndPartition= newTopicAndPartition(topic,

partition);

Map requestInfo = new HashMap();

requestInfo.put(topicAndPartition,newPartitionOffsetRequestInfo(

whichTime,1));

kafka.javaapi.OffsetRequest request= newkafka.javaapi.OffsetRequest(

requestInfo, kafka.api.OffsetRequest.CurrentVersion(),

clientName);

OffsetResponse response=consumer.getOffsetsBefore(request);if(response.hasError()) {

System.out.println("Error fetching data Offset Data the Broker. Reason:"

+response.errorCode(topic, partition));return 0;

}long[] offsets =response.offsets(topic, partition);return offsets[0];

}private TreeMap findLeader(Lista_seedBrokers,inta_port, String a_topic) {

TreeMap map = new TreeMap();

loop:for(String seed : a_seedBrokers) {

SimpleConsumer consumer= null;try{

consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup"+newDate().getTime());

List topics =Collections.singletonList(a_topic);

TopicMetadataRequest req= newTopicMetadataRequest(topics);

kafka.javaapi.TopicMetadataResponse resp=consumer.send(req);

List metaData =resp.topicsMetadata();for(TopicMetadata item : metaData) {for(PartitionMetadata part : item.partitionsMetadata()) {

map.put(part.partitionId(), part);

}

}

}catch(Exception e) {

System.out.println("Error communicating with Broker [" +seed+ "] to find Leader for [" + a_topic + ", ] Reason:" +e);

}finally{if (consumer != null)

consumer.close();

}

}returnmap;

}

}


版权声明:本文为weixin_39668890原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。