kafka 消息偏移量

[oswatch@yyjk tmp]$ cat consumerkafka_mario.py
#!/usr/bin/env python
# coding=utf-8
from  kafka import KafkaConsumer
from  kafka import TopicPartition
import time
def get_kafka_reviews(bootstrap_servers,topics):
    # print type(self.bootstrap_servers)
    consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers],auto_offset_reset='latest', enable_auto_commit=False)
    consumer.subscribe(topics=(topics))  #订阅要消费的主题
    print consumer.topics()
    print "111111",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=0)) #获取当前主题的最新偏移量
    print "222222",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=1)) #获取当前主题的最新偏移量
    print "333333",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=2)) #获取当前主题的最新偏移量
    time.sleep(30)

    review_list =[]
    for message in consumer:
        print message
        #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
        review_list.append(message.value)
    return  review_list

print get_kafka_reviews('1.1.1.27:9092','NewProxyBaseData')
[oswatch@yyjk tmp]$ python consumerkafka_mario.py
set([u'NewMongoErrorCount', u'PROXY7GeneratorIndex', u'IndexTimeOut', u'MARIOREGISTER', u'BASERECORD', u'ProxyTimeOut', u'NewProxyTimeOut', u'ProxyGatherCompleted', u'NewProxyIndexPrepare', u'mutableAlertInfo', u'NewProxyGather', u'PROXY6', u'PROXY7', u'PROXY4', u'PROXY6GeneratorIndex', u'PROXY2', u'NewProxyNoBaseData', u'PROXY0', u'PROXY1', u'PROXY1GeneratorIndex', u'ProxyBaseData', u'NewMongoReInsert', u'PROXY8', u'PROXY9', u'PROXY10GeneratorIndex', u'ProxyAlert', u'ProxyGather', u'IndexOnTime', u'EXALERTSETINFO', u'RANGEDATA', u'PROXY9GeneratorIndex', u'PROXY100', u'PROXY100GeneratorIndex', u'NewAlertCalStandBy', u'index', u'MessageStandbyBank', u'PROXY5GeneratorIndex', u'BaseRecord', u'alert', u'PROXY10', u'PROXY5', u'databus', u'NewProxyIndexDetail', u'datasend', u'alerttest', u'PROXY2GeneratorIndex', u'PUSHALERTSETSEND', u'indextest', u'PROXY3GeneratorIndex', u'SubIndexTimeOut', u'NewProxyGatherCompleted', u'LUFAX', u'DATAFLOW', u'PROXY3', u'register', u'NewMongoReUpsert', u'MarioAgent', u'MessageStandby', u'PROXY8GeneratorIndex', u'IndexGatherTimeOut', u'GatherIndexTimeOut', u'ALERTINFO', u'NewProxyBaseData', u'luohantest', u'PROXY4GeneratorIndex', u'ProxyIndexDetail', u'registerbank', u'NewProxyGatherNoCore'])
111111 584672460
222222 574728466
333333 581566016


版权声明:本文为zhaoyangjian724原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。