rocketmq简单的消息发送接收案例

一、rocketmq(个人理解)
是阿里开源的一款分布式消息中间件,主要分为以下几个部分
1、生产者Producer,消费者(Consumer),nameServer,Broker
2、生产者主要用来发送消息,,消费者用来接收消息
3、nameServer就像一个邮局,四个Broker相当于快递小哥,nameServer用来管理四个Broker。
4、发送消息有三种方式,分别是同步,异步,单项,异步发送会返回一个回调函数,同步则没有;单项发送是发送消息后不会管结果的
5、每个生产者都会有一个Topic和tags,我把它理解为用来区分消息的标识
在这里插入图片描述
二、安装rocketmq后,模拟一个消息发送接收的简单过程
三、生产者
1、首先配置rocketmq(端口号,服务器地址看自己的环境)

server:
  port: 8002
  tomcat:
    uri-encoding: utf-8

rocketMq:
  #MQ服务器地址
  addr: 127.0.0.1:9876
  #生产者分组
  producerGroup: topicProducer

2、编写生产者发送消息,发送的消息设置在请求路径的参数上

@Service
public class ProducerServiceImpl {

    @Value("${rocketMq.producerGroup}")
    private String producerGroup;

    @Value("${rocketMq.addr}")
    private String addr;

    private DefaultMQProducer producer;

    @PostConstruct
    public void initProducer() {
        producer = new DefaultMQProducer("topicProducer");
        producer.setNamesrvAddr(addr);
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();
            System.out.println("[rocketMq生产者已启动]");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean sendTopic(String topic, String tags, String msg) {
        SendResult result = new SendResult();
        try {
            Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            result = producer.send(message);
            System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        if(result.getSendStatus().equals(SendStatus.SEND_OK)){
            return true;
        }else{
            return false;
        }
    }

    @PreDestroy
    public void shutDownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }
}
@RestController
@RequestMapping("/cmcc")
public class CMCCController {

    @Autowired
    private ProducerServiceImpl producerService;

    @GetMapping("/receive/{message}")
    public ResponseEntity receive(@PathVariable String message){
        boolean isSuccess = producerService.sendTopic("TestTopic", "NBIOT", message);
        if(isSuccess){
            return new ResponseEntity<>(HttpStatus.OK);
        }else{
            return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
        }
    }
}

四、消费者
1、消费者的配置
数据源可无视~~

server:
  port: 8080

#配置数据源
spring:
  datasource:
    driver-class-name: com.taosdata.jdbc.TSDBDriver
    #url: jdbc:TAOS://127.0.0.1:6020/db?timezone=Asia/Shanghai&characterEncoding=utf8
    username: root
    password: taosdata

rocketMq:
  #MQ服务器地址
  addr: 127.0.0.1:9876
  #消费者分组
  consumerGroup: topicConsumer

public class ConsumerService {

@Value("${rocketMq.consumerGroup}")
private String consumerGroup;

@Value("${rocketMq.addr}")
private String addr;

private DefaultMQPushConsumer consumer;

@Autowired
private JdbcTemplate jdbcTemplate;

@PostConstruct
public void initConsumer() {
    consumer = new DefaultMQPushConsumer(consumerGroup);
    consumer.setNamesrvAddr(addr);
    try {
        consumer.subscribe("TestTopic", "*");
    } catch (MQClientException e) {
        e.printStackTrace();
    }
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            Message msg = msgs.get(0);
            String message = new String(msg.getBody());
           System.out.println("消息接收:" + message)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    try {
        consumer.start();
        System.out.println("[rocketMq消费者已启动]");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@PreDestroy
public void shutDownConsumer() {
    if (consumer != null) {
        consumer.shutdown();
    }
}
}

5、同时启动生产者和消费者,成功接收
在这里插入图片描述

在这里插入图片描述


版权声明:本文为weixin_42516484原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。