rabbitMQ 推送队列

首先,代码中, 需要推送消息 到队列中, 因为我也不知道啥原因 ,反正我们 推送消息是单独一个项目  ,使用 CURL请求发送队列的接口  例如 

我的数据需要发送到队列
public function syncRefundStatus($childOrderNum)
{
$res = array();
$q_type = "cloud_refund_status";
$_data = "";
RedisLog::info('cloud_refund_status参数:'.$childOrderNum);
$list = Db::table('order_goods')->field('child_order_num')->where("child_order_num='$childOrderNum' and is_delete='2' ")->select();
foreach ($list as $k=>$v) {
    $_data = array("child_order_num"=>$v['child_order_num']);
    $_data = json_encode(array($_data));
    $res[$k][] = $this->sendQueue($q_type, $q_type, $_data);   // 发动到队列
}

RedisLog::info('cloud_refund_status:'.json_encode($res));
return $res;
/**
 * 发送到队列
 */
public function sendQueue($type, $qtype, $content)
{
    $url = Config('app.api_order_queue');

    $_data['type'] = $type;
    $_data['qtype'] = $qtype;
    $_data['data'] = $content;

    RedisLog::info('order_sync:'.$url."_".json_encode($_data));

    $res = $this->_sendPostRequest($url, $_data);
    $res = json_decode($res, true);
    return $res;
}

 

**
 * 发送http post 请求,发送到api接口,已经写好了apikey
 *
 * @param string $type  类型,array、json类型
 * @param $data 提交参数,类型需要和 type 一致
 * @param $url  提交地址
 * @return mixed|string
 */
function _sendPostRequest($url, $content=null)
{
    $content = is_null($content)?array():$content;
    $ch = curl_init();
    if (substr($url, 0, 5)=='https') {
        curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); // 跳过证书检查
        curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);  // 从证书中检查SSL加密算法是否存在
    }
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
    curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
    curl_setopt($ch, CURLOPT_USERAGENT, 'Mozilla/5.0 (compatible; MSIE 5.01; Windows NT 5.0)');
    curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
    curl_setopt($ch, CURLOPT_AUTOREFERER, 1);
    curl_setopt($ch, CURLOPT_POSTFIELDS, $content);


//            curl_setopt($ch, CURLOPT_HTTPHEADER, array(
//                    "Content-Type: application/json; charset=utf-8",
//                    "Content-Length: " . count($content))
//            );


    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    $tmpInfo = curl_exec($ch);
    if (curl_errno($ch)) {
        return curl_error($ch);
    }
    curl_close($ch);
    return $tmpInfo;
}
api_order_queue = http://rabbitmq.d.gbicom.com/send

他会根据不同的type 调用不同的发送队列

public function send()
{

    $type = input('post.type');     // 队列名称
    $qtype = input('post.qtype');   // q_type
    $data = input('post.data');


    RedisLog::info('rabbitqueueLog:'.$type."_".$data);

    if (empty($type) || empty($data)) {
        return json(['code' => 302, 'msg' => '数据为空']);
    }

    $data = json_decode($data, true);
    $enqueue = new enqueue();

    if ($type == 'biz') {   // 提交批量产品编码到队列查询服务进度

        return $enqueue->write_biz($type, $data);

    }elseif ($type == 'submit_deliver_Cloud'){  // 订单提交到交付

        return $enqueue->write_biz($type, $data);

    }elseif ($type == 'cloud_refund_status'){  // 订单提交到交付  确认退款问题

        return $enqueue->write_biz($type, $data);

    }elseif ($type == 'order_biz') {    // 订单提交到交付的确认函和委托书

        return $enqueue->write_biz_at($qtype, $data);

    }elseif ($type == 'DistributionLead' || $type == 'ReleaseLead' || $type == 'CreateLeadCommunication') {

        return $enqueue->write_ai($type, $data);

    } elseif ($type == 'CreateAICommunication') {

        return $enqueue->crm_ai($type, $data);

    } elseif ($type == 'coupon_crm') {

        return $enqueue->write_campaign($type, $data);   // 5.18 优惠券同步到crm   -- by zuiw 2019.04.10

    } elseif($type == 'order' ||$type == 'ProcessContractInvoice' || $type =='ProcessContractRefund') {

        return $enqueue->write_crm($type, $data);
    }
}

 

以type= 'submit_deliver_Cloud' 为例 

public function write_biz($type, $data = array())
{
    $config = array_merge($this->base_config(), $this->write_biz_config());

    return $this->send_queue($config, $type, $data);
}
/**
 * 服务器配置.
 *
 * @return array
 */
private function base_config()
{
    return [
        'host' => config('rabbitmq.server.host'),
        'port' => config('rabbitmq.server.port'),
        'user' => config('rabbitmq.server.user'),
        'password' => config('rabbitmq.server.password'),
        'vhost' => config('rabbitmq.server.vhost'),
    ];
}
/**
 * 交付队列配置.
 *
 * @return array
 */
private function write_biz_config()
{
    return [
        'exchange_name' => config('rabbitmq.write_biz.exchange_name'),    //上一篇文章有介绍字段
        'queue_name' => config('rabbitmq.write_biz.queue_name'),
        'route_key' => config('rabbitmq.write_biz.route_key'),
        'consumer_tag' => config('rabbitmq.write_biz.consumer_tag'),
    ];
}

 

/**
 * 消息写入mq.
 *
 * @param $config
 * @param $type
 * @param array $data
 *
 * @return \think\response\Json
 *
 * @throws \Exception
 */
private function send_queue($config, $type, $data = array())
{
    $q_id = session_create_id();
    $msg = [
        'q_id' => $q_id,    //session_create_id(),
        'q_time' => date('Y-m-d H:i:s'),
        'q_data' => $data,
        'q_type' => $type,
    ];
    $jsonMsg = json_encode($msg);
    RedisLog::info('rabbitmq入队:'.$jsonMsg);
    try {
        $this->handle = new rabbitmq();
        $this->handle->connect($config);
        $this->handle->publish($jsonMsg);
    } catch (\Exception $e) {
        throw new \Exception($e->getMessage(), 300);
    }

    return json(['code' => 200, 'msg' => 'success', 'q_id' => $q_id]);
}

 


版权声明:本文为maosaiwei原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。