rabbitmq在yii2中使用

消费者,写在console文件夹下的controller中,我是先执行消费者的:

 public function actionStart(){

        // 创建连接
        $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'coder', 'coder');
        // 创建channel,多个channel可以共用连接
        $channel = $connection->channel();

        // 可能会在数据发布之前启动消费者,所以我们要确保队列存在,然后再尝试从中消费消息。

        // 创建直连的交换机
        $channel->exchange_declare('direct_logs', 'direct', false, false, false);
        // 创建队列
        $channel->queue_declare('hello', false, false, false, false);
        // 交换机跟队列的绑定,
        $channel->queue_bind('hello', 'direct_logs', 'routigKey');

        // 回调函数

        $callback = function ($msg) {
            $this->_log->LogNotice($msg->body);
        };

        // 启动队列消费者
        $channel->basic_consume('hello', '', false, true, false, false, $callback);
        // 判断是否存在回调函数
        while($leftCount = count($channel->callbacks)) {
            // 此处为执行回调函数
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

 

生产者:

<?php
/**
 * Created by PhpStorm.
 * User: seven
 * Date: 2019/3/12
 * Time: 下午6:15
 */

namespace common\utils;


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PhpClient
{
    public static function Call($str){
        // 创建连接
        $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'coder', 'coder');
        // 创建channel,多个channel可以共用连接
        $channel = $connection->channel();

        // 创建交换机以及队列(如果已经存在,不需要重新再次创建并且绑定)

        // 创建直连的交换机
        $channel->exchange_declare('direct_logs', 'direct', false, false, false);
        // 创建队列
        $channel->queue_declare('hello', false, false, false, false);
        // 交换机跟队列的绑定,
        $channel->queue_bind('hello', 'direct_logs', 'routigKey');


        // 设置消息bady传送字符串logs(消息只能为字符串,建议消息均json格式)
        $str = empty($str) ? 'hello mq': $str;
        $msg = new AMQPMessage($str);
        // 发送数据到对应的交换机direct_logs并设置对应的routigKey
        $channel->basic_publish($msg, 'direct_logs', 'routigKey');
        $channel->close();
        $connection->close();

//        $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'coder', 'coder');
//        $channel = $connection->channel();
//        $channel->queue_declare('task_queue', false, true, false, false);
//        $data=empty($n)?"Hello World666!":$n;
//        $msg = new AMQPMessage($data,
//            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
//        );
//        $channel->basic_publish($msg, '', 'task_queue');
//        $channel->close();
//        $connection->close();

        return true;
    }
}

调用生产者:

PhpClient::Call('test!');

 

调用生产者可以看到,控制台数据,这里php线上可以用定时器每隔几秒去查询,或者使用后台运行。

 


版权声明:本文为jj546630576原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。