RabbitMQ实现一个生产者多个消费者的传统“发布-订阅”模式

RabbitMQ在默认的几种模式当中,并没有类似ActiveMQ那样的Top模式(发布-订阅),即一个生产者向某一主题中发布消息时,所有订阅了该主题的消费都能够接收到消息。所以如果使用RabbitMQ来实现类似的效果,我们可以通过以下方式来实现:

  1. 消息生产端只创建交换机(Exchange),不创建队列。
  2. 消息消费端创建临时队列(也可以是永久队列,根据业务需要),并将该队列以直连模式(direct)绑定到交换机上,并指定一个路由key(所有消费端的路由都要一致)。
  3. 消息生产端在发送消息到RabbitMQ时,也需要指定路由Key(该key与消费端的一致)。

基于springboot整合RabbitMQ实现以上效果

build.gradle引入依赖(由于我的项目中使用的springboot的版本为1.5.6,所以rabbitMq的依赖我也选择了这个版本):

// RabbitMQ依赖
compile('org.springframework.boot:spring-boot-starter-amqp:1.5.6.RELEASE')

application.properties配置文件:

## RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
# 手动确认消息已被消费
spring.rabbitmq.listener.simple.acknowledge-mode=manual

RabbitMQ配置类:

/**
 * RabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {
    /**
     * 告警消息临时接收队列名称(以当前节点ip为后缀)
     */
    public static final String ALERT_MESSAGE_RECEIVER_QUEUE = "alert.message.queue_" + HostUtil.getLocalAddress();

    /**
     * 交换机名称
     */
    public static final String ALERT_MESSAGE_DIRECT_EXCHANGE = "alert.message.exchange";

    /**
     * 路由key
     */
    public static final String ALERT_MESSAGE_ROUTING_KEY = "alertMessageKey";


    /**
     * 创建直连模式的交换机
     * @return DirectExchange
     */
    @Bean
    public DirectExchange alertMessageFanoutExchange(){
        // 创建永久并持久化的交换机
        return new DirectExchange(ALERT_MESSAGE_DIRECT_EXCHANGE);
    }

    /**
     * 创建消息队列对象
     * @return Queue
     */
    @Bean
    public Queue alertMessageReceiverQueue(){
        // 创建一个非永久、非持久化的队列(即当客户端连接断开后立即被删除的队列)
        return new Queue(ALERT_MESSAGE_RECEIVER_QUEUE, false, false, true);
    }
}

消息发送端:

/**
 * RabbitMQ告警消息发送器
 * 
 */
@Component("rabbitMQAlertMessageSender")
public class RabbitMQAlertMessageSender implements RabbitMQSender{

    /** 日志 */
    private static Logger logger = LoggerFactory.getLogger(RabbitMQAlertMessageSender.class);

    /** 注入rabbitMQ消息发送模板 */
    @Autowired
    private RabbitTemplate template;

    @PostConstruct
    public void init() {
        if (this.template != null){
            this.template.setConfirmCallback((correlationData, ack, cause) -> {
                if (logger.isDebugEnabled()){
                    logger.debug("==> 消息唯一标识:{}" , correlationData);
                    logger.debug("==> 确认结果:{}", ack);
                    logger.debug("==> 失败原因:{}", cause);
                }
                if (cause != null){
                    logger.error("==> 告警消息发送到RabbitMQ后确认失败,失败原因:{}", cause);
                }
            });
            this.template.setMandatory(true);
            this.template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                if (logger.isDebugEnabled()){
                    logger.debug("==> 消息主体 message: {}", message);
                    logger.debug("==> 响应code: {}", replyCode);
                    logger.debug("==> 描述:{}", replyText);
                    logger.debug("==> 消息使用的交换器 exchange: {}", exchange);
                    logger.debug("==> 消息使用的路由键 routing: {}", routingKey);
                }
            });
        }
    }

    /**
     * 发送消息到RabbitMQ中
     * @param message  消息
     */
    @Override
    public void send(String message) {
        if (message != null){            this.template.convertAndSend(RabbitMQConfig.ALERT_MESSAGE_DIRECT_EXCHANGE, RabbitMQConfig.ALERT_MESSAGE_ROUTING_KEY, message);
        }
    }
}

消息消费端:

/**
 * RabbitMQ告警消息接收器
 * 
 */
@Component("rabbitMQAlertMessageReceiver")
public class RabbitMQAlertMessageReceiver {

    /**
     * 日志
     */
    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 接收RabbitMQ的告警消息
     * @param message 消息
     */
    @RabbitListener(bindings = @QueueBinding(
            // 指定队列名称(该队列在RabbitMQConfig中已声明),此处使用spring的OGNL表达式获取已经在RabbitMQConfig类中声明好的队列名称
            value = @Queue(value = "#{alertMessageReceiverQueue.name}", durable = "false", autoDelete = "true"),
            // 将当前队列绑定到交换机上(直连模式)
            exchange = @Exchange(value = RabbitMQConfig.ALERT_MESSAGE_DIRECT_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT),
            // 指定路由key
            key = RabbitMQConfig.ALERT_MESSAGE_ROUTING_KEY
        )
    )
    public void receiver(Message message, Channel channel) {
        try {
            if (message != null){
                String messageStr= new String(message.getBody());
                logger.info("==> 接收到告警消息:{}", messageStr);
                
                //TODO 业务逻辑...
                
				// 手动确认消息已被消费(不要批量确认)
                hannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e){
            logger.error("==> 消费告警消息过程中异常", e);
        }
    }
}

参考文章:https://blog.csdn.net/sinat_30735061/article/details/97658059?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase


版权声明:本文为hqq518原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。