Springcloud之Rocketmq发送消息

简介

前面已经介绍了rocketmq的简单使用,本篇我们介绍使用订单服务下单成功之后发送短信消息。上篇docker下使用rocketmq,估计是版本问题,本篇使用一个低版本的rocketmq。

rocketmq下载

官网下载一个4.4的安装包,

解压安装

下载
解压

unzip rocketmq-all-4.4.0-bin-release.zip

修改名称

MV rocketmq-all-4.4.0-bin-release rocketmq

目录

启动namesvr

启动

nohup ./bin/mqnamesrv &

查看日志

tail -f /root/logs/rocketmqlogs/namesrv.log

启动broker

broker.conf需要修改增加

namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.5.130 

修改固定的jvm参数 JAVA_OPT=“${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” 根据实际情况改小点
-Xms256m -Xmx256m -Xmn128m

[root@elite rocketmq]#  vim bin/runbroker.sh 
[root@elite rocketmq]# vim bin/runserver.sh 

启动

nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true -c ../conf/broker.conf  &

查看日志

tail -f /root/logs/rocketmqlogs/broker.log

关闭服务

bin/mqshutdown namesrv
bin/mqshutdown broker

测试

发送消息

 bin/tools.sh  org.apache.rocketmq.example.quickstart.Producer

发送消息
消费消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

消费消息

手动创建topic

./mqadmin updateTopic -n localhost:9876  -b localhost:10911  -t order-topic

项目搭建

依赖

用户与订单模块中都需要添加

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

yml配置地址

#rocketmq配置
rocketmq:
  name-server: 192.168.5.130:9876
  producer:
    group: springcloud-order

订单模块

package com.elite.springcloud.controller;

import com.elite.springcloud.entity.Order;
import com.elite.springcloud.entity.Product;
import com.elite.springcloud.entity.User;
import com.elite.springcloud.interfaces.ProductService;
import com.elite.springcloud.service.IOrderService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;

/**
 * <p>
 * 订单表 前端控制器
 * 下单控制
 * </p>
 *
 * @author elite
 * @since 2022-09-10
 */
@RestController
@RequestMapping("/springcloud/order/rocketmq")
public class OrderRocketMqController {

    //订单服务
    @Autowired
    IOrderService orderService;

    @Autowired
    ProductService productService;

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    /**
     * 模拟下单 传入商品id,用户随机
     * @param product_id
     * @return
     */
    @GetMapping("/saveOrder/{product_id}")
    public String saveOrder(@PathVariable("product_id")Integer product_id ){
        //获取商品
        Product product = productService.getProductById(product_id);
        if (product == null){
            return "商品信息不存在";
        }
        //用户信息
        User user = new User();
        user.setUserId(1);
        //订单信息
        Order order = new Order();
        order.setOrderNo(10);
        order.setProductId(product_id);
        order.setUserId(user.getUserId());
        order.setOrderNum(1);
        order.setOrderAmt(product.getProductPrice());
        order.setOrderStatus("下单");
        order.setPayStatus("支付成功");
        order.setCreateBy("牛奶糖");
        order.setUpdateBy("牛奶糖");
        orderService.save(order);
        //下单成功之后,将消息放到mq中
        rocketMQTemplate.syncSend("order-topic", order,6000);
        //发布下单消息
        return "下单成功"+order.toString();
    }
}

用户模块添加监听

//监听消息
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "springcloud-user", topic = "order-topic")
public class MessageLsner implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("订单信息:", JSON.toJSONString(order));
    }
}

测试模块

发送下订单请求:
发送订单请求
存数据库订单
保存订单信息
监听消息
消费者消息
由于Sms开通有问题,这里就不讲解了。

重点问题:

  1. 启动broker前修改内存配置

修改固定的jvm参数 JAVA_OPT=“${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” 根据实际情况改小点
##-Xms256m -Xmx256m -Xmn128m

[root@elite rocketmq]#  vim bin/runbroker.sh 
[root@elite rocketmq]# vim bin/runserver.sh 
  1. 无法自动创建主题
[root@elite conf]# cat broker.conf 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=localhost:9876 ## 添加namesrv
brokerIP1=192.168.5.130 ##添加ip
autoCreateTopicEnable=true ###开启自动创建topic
 启动broker加参数指向配置文件,否则配置文件不生效。
bin/mqbroker -n localhost:9876 -c conf/broker.conf
  1. 发送消息提示内存不足
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.97 CQ:  0.97 INDEX:  0.97, maybe your broker machine memory too small.
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
###切换到到 rocketmq 配置文件所在路径
vim /usr/rocketmq/conf/2m-2s-async/broker-a.properties
###最后一行增加 diskMaxUsedSpaceRatio=99,表示剩余磁盘比例不足99%才报错
diskMaxUsedSpaceRatio=99
###wq 保存退出

改了,重启namesrv,broker.


版权声明:本文为qq_37400096原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。