首先,代码中, 需要推送消息 到队列中, 因为我也不知道啥原因 ,反正我们 推送消息是单独一个项目 ,使用 CURL请求发送队列的接口 例如
我的数据需要发送到队列
public function syncRefundStatus($childOrderNum)
{$res = array();
$q_type = "cloud_refund_status";
$_data = "";
RedisLog::info('cloud_refund_status参数:'.$childOrderNum);
$list = Db::table('order_goods')->field('child_order_num')->where("child_order_num='$childOrderNum' and is_delete='2' ")->select();
foreach ($list as $k=>$v) {
$_data = array("child_order_num"=>$v['child_order_num']);
$_data = json_encode(array($_data));
$res[$k][] = $this->sendQueue($q_type, $q_type, $_data); // 发动到队列
}
RedisLog::info('cloud_refund_status:'.json_encode($res));
return $res;/**
* 发送到队列
*/
public function sendQueue($type, $qtype, $content)
{
$url = Config('app.api_order_queue');
$_data['type'] = $type;
$_data['qtype'] = $qtype;
$_data['data'] = $content;
RedisLog::info('order_sync:'.$url."_".json_encode($_data));
$res = $this->_sendPostRequest($url, $_data);
$res = json_decode($res, true);
return $res;
}
**
* 发送http post 请求,发送到api接口,已经写好了apikey
*
* @param string $type 类型,array、json类型
* @param $data 提交参数,类型需要和 type 一致
* @param $url 提交地址
* @return mixed|string
*/
function _sendPostRequest($url, $content=null)
{
$content = is_null($content)?array():$content;
$ch = curl_init();
if (substr($url, 0, 5)=='https') {
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); // 跳过证书检查
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); // 从证书中检查SSL加密算法是否存在
}
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_USERAGENT, 'Mozilla/5.0 (compatible; MSIE 5.01; Windows NT 5.0)');
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt($ch, CURLOPT_AUTOREFERER, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $content);
// curl_setopt($ch, CURLOPT_HTTPHEADER, array(
// "Content-Type: application/json; charset=utf-8",
// "Content-Length: " . count($content))
// );
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$tmpInfo = curl_exec($ch);
if (curl_errno($ch)) {
return curl_error($ch);
}
curl_close($ch);
return $tmpInfo;
}
api_order_queue = http://rabbitmq.d.gbicom.com/send他会根据不同的type 调用不同的发送队列
public function send()
{
$type = input('post.type'); // 队列名称
$qtype = input('post.qtype'); // q_type
$data = input('post.data');
RedisLog::info('rabbitqueueLog:'.$type."_".$data);
if (empty($type) || empty($data)) {
return json(['code' => 302, 'msg' => '数据为空']);
}
$data = json_decode($data, true);
$enqueue = new enqueue();
if ($type == 'biz') { // 提交批量产品编码到队列查询服务进度
return $enqueue->write_biz($type, $data);
}elseif ($type == 'submit_deliver_Cloud'){ // 订单提交到交付
return $enqueue->write_biz($type, $data);
}elseif ($type == 'cloud_refund_status'){ // 订单提交到交付 确认退款问题
return $enqueue->write_biz($type, $data);
}elseif ($type == 'order_biz') { // 订单提交到交付的确认函和委托书
return $enqueue->write_biz_at($qtype, $data);
}elseif ($type == 'DistributionLead' || $type == 'ReleaseLead' || $type == 'CreateLeadCommunication') {
return $enqueue->write_ai($type, $data);
} elseif ($type == 'CreateAICommunication') {
return $enqueue->crm_ai($type, $data);
} elseif ($type == 'coupon_crm') {
return $enqueue->write_campaign($type, $data); // 5.18 优惠券同步到crm -- by zuiw 2019.04.10
} elseif($type == 'order' ||$type == 'ProcessContractInvoice' || $type =='ProcessContractRefund') {
return $enqueue->write_crm($type, $data);
}
}
以type= 'submit_deliver_Cloud' 为例
public function write_biz($type, $data = array())
{
$config = array_merge($this->base_config(), $this->write_biz_config());
return $this->send_queue($config, $type, $data);
}/**
* 服务器配置.
*
* @return array
*/
private function base_config()
{
return [
'host' => config('rabbitmq.server.host'),
'port' => config('rabbitmq.server.port'),
'user' => config('rabbitmq.server.user'),
'password' => config('rabbitmq.server.password'),
'vhost' => config('rabbitmq.server.vhost'),
];
}/**
* 交付队列配置.
*
* @return array
*/
private function write_biz_config()
{
return [
'exchange_name' => config('rabbitmq.write_biz.exchange_name'), //上一篇文章有介绍字段
'queue_name' => config('rabbitmq.write_biz.queue_name'),
'route_key' => config('rabbitmq.write_biz.route_key'),
'consumer_tag' => config('rabbitmq.write_biz.consumer_tag'),
];
}
/**
* 消息写入mq.
*
* @param $config
* @param $type
* @param array $data
*
* @return \think\response\Json
*
* @throws \Exception
*/
private function send_queue($config, $type, $data = array())
{
$q_id = session_create_id();
$msg = [
'q_id' => $q_id, //session_create_id(),
'q_time' => date('Y-m-d H:i:s'),
'q_data' => $data,
'q_type' => $type,
];
$jsonMsg = json_encode($msg);
RedisLog::info('rabbitmq入队:'.$jsonMsg);
try {
$this->handle = new rabbitmq();
$this->handle->connect($config);
$this->handle->publish($jsonMsg);
} catch (\Exception $e) {
throw new \Exception($e->getMessage(), 300);
}
return json(['code' => 200, 'msg' => 'success', 'q_id' => $q_id]);
}
版权声明:本文为maosaiwei原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。