生产者代码:
<?php
$rk = new Rdkafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
// 指定生产者链接kafka集群所有的broker地址列表, 参数如: 为hostl:portl,host2:port2,
//这里不需要指定所有broker地址,生产者可以从给定的broker里找到其他broker的信息,但是建议配置两个以上的地址信息
//当其中一个宕机,生产者仍然可以链接到kafka集群
$rk->addBrokers("0.0.0.0:9092");
// 创建topic
$topic = $rk->newTopic("test2");
while (true) {
$message = "hello kafka " . date("Y-m-d H:i:s");
echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;
try {
//发送消息
//参数1: RD_KAFKA_PARTITION_UA 自动分区, 也可以是0/1/2自己指定
//参数2: 消息标记 0或者RD_KAFKA_MSG_F_BLOCK 队列满了 阻止生产消息
//参数3: 待发送的消息
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
sleep(2);
} catch (\Exception $e) {
echo $e->getMessage() . PHP_EOL;
}
}
//销毁生产者实例前调用,确保消息都请求完成
//$rk->flush($timeout_ms);消费者代码:
<?php
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG); // 设置日志级别
$rk->addBrokers("0.0.0.0:9092"); // 添加经纪人,就是ip地址
$topic = $rk->newTopic("test2"); // 这里的$rk和生产者是不同的类哦
// 第一个参数分区ID
// 第二个参数是开始消费的偏移量,有效值
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
// 第一个参数是要消耗的分区
// 第二个参数是等待收到消息的最长时间,1000是一秒
$msg = $topic->consume(0, 1000);
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
//要消费的消息
echo $msg->payload, "\n";
}
}参考链接: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/index.html
版权声明:本文为qq_22323251原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。