KafkaOffsetSearch kot= newKafkaOffsetSearch();
TreeMap metadatas =kot.findLeader(seeds, port, topic);
Map map = new HashMap();
List offSetList = new ArrayList<>();for (Entryentry : metadatas.entrySet()) {int partition =entry.getKey();
String leadBroker=entry.getValue().leader().host();
String clientName= "Client_" + topic + "_" +partition;
SimpleConsumer consumer= new SimpleConsumer(leadBroker, port, 100000,64 * 1024, clientName);long readOffset =getLastOffset(consumer, topic, partition,
timestamp, clientName);
offSetList.add(readOffset);
map.put(partition+"", readOffset+"");
System.out.println(partition+":"+readOffset);if(consumer!=null)consumer.close();
}returnmap;
}public static longgetLastOffset(SimpleConsumer consumer, String topic,int partition, longwhichTime, String clientName) {
TopicAndPartition topicAndPartition= newTopicAndPartition(topic,
partition);
Map requestInfo = new HashMap();
requestInfo.put(topicAndPartition,newPartitionOffsetRequestInfo(
whichTime,1));
kafka.javaapi.OffsetRequest request= newkafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response=consumer.getOffsetsBefore(request);if(response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason:"
+response.errorCode(topic, partition));return 0;
}long[] offsets =response.offsets(topic, partition);return offsets[0];
}private TreeMap findLeader(Lista_seedBrokers,inta_port, String a_topic) {
TreeMap map = new TreeMap();
loop:for(String seed : a_seedBrokers) {
SimpleConsumer consumer= null;try{
consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup"+newDate().getTime());
List topics =Collections.singletonList(a_topic);
TopicMetadataRequest req= newTopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp=consumer.send(req);
List metaData =resp.topicsMetadata();for(TopicMetadata item : metaData) {for(PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
}catch(Exception e) {
System.out.println("Error communicating with Broker [" +seed+ "] to find Leader for [" + a_topic + ", ] Reason:" +e);
}finally{if (consumer != null)
consumer.close();
}
}returnmap;
}
}