kafka笔记-生产者和消费者代码(php)

生产者代码:

<?php

$rk = new Rdkafka\Producer();
$rk->setLogLevel(LOG_DEBUG);

// 指定生产者链接kafka集群所有的broker地址列表, 参数如: 为hostl:portl,host2:port2,
//这里不需要指定所有broker地址,生产者可以从给定的broker里找到其他broker的信息,但是建议配置两个以上的地址信息
//当其中一个宕机,生产者仍然可以链接到kafka集群
$rk->addBrokers("0.0.0.0:9092");

// 创建topic
$topic = $rk->newTopic("test2");

while (true) {
    $message = "hello kafka " . date("Y-m-d H:i:s");
    echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

    try {
        //发送消息
        //参数1: RD_KAFKA_PARTITION_UA 自动分区, 也可以是0/1/2自己指定
        //参数2: 消息标记 0或者RD_KAFKA_MSG_F_BLOCK 队列满了 阻止生产消息
        //参数3: 待发送的消息
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        sleep(2);
    } catch (\Exception $e) {
        echo $e->getMessage() . PHP_EOL;
    }
}
//销毁生产者实例前调用,确保消息都请求完成
//$rk->flush($timeout_ms);

消费者代码

<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG); // 设置日志级别
$rk->addBrokers("0.0.0.0:9092"); // 添加经纪人,就是ip地址

$topic = $rk->newTopic("test2"); // 这里的$rk和生产者是不同的类哦

// 第一个参数分区ID
// 第二个参数是开始消费的偏移量,有效值
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    // 第一个参数是要消耗的分区
    // 第二个参数是等待收到消息的最长时间,1000是一秒
    $msg = $topic->consume(0, 1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        //要消费的消息
        echo $msg->payload, "\n";
    }
}

参考链接: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/index.html

 


版权声明:本文为qq_22323251原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。