RocketMQTemplate的基本使用方法

1、pom.xml依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
	 <version>2.0.3</version>
</dependency>

2、配置文件

# RocketMQ 相关配置
rocketmq:
  # 指定 nameServer
  name-server: 127.0.0.1:9876
  # Producer 生产者
  producer:
    group: my-group  # 指定发送者组名
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false


3、RocketMQTemplate的基本方法

首先使用@Autowired注入RocketMQTemplate(后面直接使用,就不特殊说明)

@Autowired
private RocketMQTemplate rocketMQTemplate;

简单使用:

package com.example.rocketmq.controller;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息发送方
 */
@Component
public class Producer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 普通字符串消息
     */
    public void sendMessage() {
        String json = "普通消息";
        rocketMQTemplate.convertAndSend("sendMessage", json);
    }

    /**
     * 同步消息
     */
    public void syncSend() {
        String json = "同步消息";
        SendResult sendMessage = rocketMQTemplate.syncSend("sendMessage", json);
        System.out.println(sendMessage);
    }

    /**
     * 异步消息
     */
    public void asyncSend() {
        String json = "异步消息";
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("123");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("456");
            }
        };
        rocketMQTemplate.asyncSend("sendMessage", json, callback);
    }

    /**
     * 单向消息
     */
    public void onewaySend() {
        String json = "单向消息";
        rocketMQTemplate.sendOneWay("sendMessage", json);
    }
}

package com.example.rocketmq.controller;

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 消息消费方
 * 1.如果两个消费者group和topic都一样,则二者轮循接收消息
 * 2.如果两个消费者topic一样,而group不一样,则消息变成广播机制
 * RocketMQListener<>泛型必须和接收的消息类型相同
 */
@Component
@RocketMQMessageListener(
        topic = "sendMessage",                      // 1.topic:消息的发送者使用同一个topic
        consumerGroup = "test-group",               // 2.group:不用和生产者group相同 ( 在RocketMQ中消费者和发送者组没有关系 )
        selectorExpression = "*",                   // 3.tag:设置为 * 时,表示全部。
        messageModel = MessageModel.CLUSTERING    // 4.消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )( BROADCASTING:广播机制 )
)
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String str) {
        try {
            // 睡眠五十毫秒,确保消息成功接收(演示专用,勿喷)
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(str);
    }
}

版权声明:本文为a1120467800原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。