消费者,写在console文件夹下的controller中,我是先执行消费者的:
public function actionStart(){
// 创建连接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'coder', 'coder');
// 创建channel,多个channel可以共用连接
$channel = $connection->channel();
// 可能会在数据发布之前启动消费者,所以我们要确保队列存在,然后再尝试从中消费消息。
// 创建直连的交换机
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
// 创建队列
$channel->queue_declare('hello', false, false, false, false);
// 交换机跟队列的绑定,
$channel->queue_bind('hello', 'direct_logs', 'routigKey');
// 回调函数
$callback = function ($msg) {
$this->_log->LogNotice($msg->body);
};
// 启动队列消费者
$channel->basic_consume('hello', '', false, true, false, false, $callback);
// 判断是否存在回调函数
while($leftCount = count($channel->callbacks)) {
// 此处为执行回调函数
$channel->wait();
}
$channel->close();
$connection->close();
}
生产者:
<?php
/**
* Created by PhpStorm.
* User: seven
* Date: 2019/3/12
* Time: 下午6:15
*/
namespace common\utils;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PhpClient
{
public static function Call($str){
// 创建连接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'coder', 'coder');
// 创建channel,多个channel可以共用连接
$channel = $connection->channel();
// 创建交换机以及队列(如果已经存在,不需要重新再次创建并且绑定)
// 创建直连的交换机
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
// 创建队列
$channel->queue_declare('hello', false, false, false, false);
// 交换机跟队列的绑定,
$channel->queue_bind('hello', 'direct_logs', 'routigKey');
// 设置消息bady传送字符串logs(消息只能为字符串,建议消息均json格式)
$str = empty($str) ? 'hello mq': $str;
$msg = new AMQPMessage($str);
// 发送数据到对应的交换机direct_logs并设置对应的routigKey
$channel->basic_publish($msg, 'direct_logs', 'routigKey');
$channel->close();
$connection->close();
// $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'coder', 'coder');
// $channel = $connection->channel();
// $channel->queue_declare('task_queue', false, true, false, false);
// $data=empty($n)?"Hello World666!":$n;
// $msg = new AMQPMessage($data,
// array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
// );
// $channel->basic_publish($msg, '', 'task_queue');
// $channel->close();
// $connection->close();
return true;
}
}
调用生产者:
PhpClient::Call('test!');
调用生产者可以看到,控制台数据,这里php线上可以用定时器每隔几秒去查询,或者使用后台运行。
版权声明:本文为jj546630576原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。