springBoot集成rabbitMq

1.创建虚拟主机

Users可以创建用户,Virtual Hosts创建虚拟主机

image-20210523124636692

2.给用户授权虚拟机

image-20210523125613082

3.创建对列

image-20210523130359269

持久化:如果选durable,则队列消息自动持久化到磁盘上,如果选transient,则不会持久化;
自动删除:默认值no,如果yes,则在消息队列没有使用的情况下,队列自行删除。

4.创建交换机

image-20210523131013372

自动删除:默认值no,如果是yes,则在将所有队列与交换机取消绑定之后,交换机将自动删除。
交换机类型:

  • fanout:广播类型
  • direct:路由类型
  • topic:通配符类型,基于消息的路由键路由
  • headers:通配符类型,基于消息的header路由
    内部交换器:默认值no,如果是yes,消息无法直接发送到该交换机,必须通过交换机的转发才能到达
    次交换机。本交换机只能与交换机绑定。

5.交换机和对列绑定

image-20210523131549847

6.创建父工程

rabbitmqparent

​ ├── rabbitmq-consumer

​ └── rabbitmq-provider

rabbitmq-provider

pom依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.18.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--rabbit测试依赖-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
    </dependency>

    <!--springBoot测试依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

application

server.port=8081

# rabbitMq服务地址
spring.rabbitmq.host=127.0.0.1
# rabbitMq端口号
spring.rabbitmq.port=5672
# rabbitMq虚拟主机地址
spring.rabbitmq.virtual-host=admin
# rabbitMq用户名
spring.rabbitmq.username=guest
# rabbitMq密码
spring.rabbitmq.password=guest

启动类

@SpringBootApplication
public class ProviderApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class, args);
    }
}

无法启动

添加web依赖解决

image-20210523123150040

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
</dependency>

rabbitmq-consumer

pom依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.18.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--rabbit测试依赖-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
    </dependency>
    <!--springBoot测试依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

application

server.port=8080

# rabbitMq服务地址
spring.rabbitmq.host=127.0.0.1
# rabbitMq端口号
spring.rabbitmq.port=5672
# rabbitMq虚拟主机地址
spring.rabbitmq.virtual-host=admin
# rabbitMq用户名
spring.rabbitmq.username=guest
# rabbitMq密码
spring.rabbitmq.password=guest

启动类

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }
}

7.五种工作模式

7.1简单模式 Hello World

记得先创建队列

image-20210523135049959

provider生产者发生消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloWorld {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 编写生产者发送消息到对列

    @Test
    public void sendHollWorld() {
        /**
         * 参数1:对列名称
         * 参数2:要发送的消息
         */
        rabbitTemplate.convertAndSend("simple_queue", "你好,Hello World");
    }
}

consumer消费者接受消息

/**
 * 消息监听器
 * 消费者,接收消息队列消息监听器
 * 必须将当前监听器对象注入Spring的容器中
 * queues = "simple_queue" 队列,要与生产者保持一致
 */
@Component
@RabbitListener(queues = "simple_queue")
public class HelloWorldListener {

    /**
     * 接受消息,业务处理
     *
     * @param msg
     */
    @RabbitHandler
    public void simpleHandler(String msg) {
        System.out.println("接收到的消息是 : " + msg);
    }

}

7.2工作队列模式 Work queues

记得先创建队列

于简单模式相比,增加了消费者,代码基本一样

image-20210523134936132

provider生产者发生消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class WorkQueues {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Test
    public void sendWork() {
        /**
         * 参数1:对列名称
         * 参数2:要发送的消息
         */
        rabbitTemplate.convertAndSend("work_queue", "我是work模式");
    }
}

consumer消费者接受消息01

/**
 * 消息监听器
 * 消费者,接收消息队列消息监听器
 * 必须将当前监听器对象注入Spring的容器中
 * queues = "work_queue" 要与发送消息保持一致
 */
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener01 {

    /**
     * 接受消息,业务处理
     *
     * @param msg
     */
    @RabbitHandler
    public void simpleHandler(String msg) {
        System.out.println("work监听消息01 : " + msg);
    }

}

consumer消费者接受消息02

/**
 * 消息监听器
 * 消费者,接收消息队列消息监听器
 * 必须将当前监听器对象注入Spring的容器中
 * queues = "work_queue" 要与发送消息保持一致
 */
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener02 {

    /**
     * 接受消息,业务处理
     *
     * @param msg
     */
    @RabbitHandler
    public void simpleHandler(String msg) {
        System.out.println("work监听消息02 : " + msg);
    }

}

7.3Exchange常见类型

Exchange有常见以下3种类型:

  • Fanout:广播 将消息交给所有绑定到交换机的队列, 不处理路由键。只需要简单的将队列绑定到
    交换机上。fanout 类型交换机转发消息是最快的。
  • Direct:定向 把消息交给符合指定routing key 的队列. 处理路由键。需要将一个队列绑定到交换
    机上,要求该消息与一个特定的路由键完全匹配。如果一个队列绑定到该交换机上要求路由键
    “dog”,则只有被标记为 “dog” 的消息才被转发,不会转发 dog.puppy,也不会转发 dog.guard,
    只会转发dog。
    其中,路由模式使用的是 direct 类型的交换机。
  • Topic:主题(通配符) 把消息交给符合routing pattern(路由模式)的队列. 将路由键和某模式进
    行匹配。此时队列需要绑定要一个模式上。符号 “#” 匹配一个或多个词,符号""匹配不多不少一个
    词。因此“audit.#” 能够匹配到“audit.irs.corporate”,但是“audit.
    ” 只会匹配到 “audit.irs”。
    其中,主题模式(通配符模式)使用的是 topic 类型的交换机。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑
定,或者没有符合路由规则的队列,那么消息会丢失

7.4Publish/Subscribe发布与订阅模式

简单理解,我发送100条消息,交换机收到100条,他会给队列一100条消息,会给队列二100条消息

image-20210523141751153

发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的
队列都将接收到消息
【广播消息:一次性将消息发送给所有消费者,每个消费者收到消息均一致】

创建2个队列

image-20210523141950532

创建交换机并绑定队列

image-20210523142441777

provider生产者发生消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class publishExchange {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendPublish() {
        /**
         * 参数一:交换机
         * 参数二:路由键(空)
         * 参数三:发送的消息
         */
        rabbitTemplate.convertAndSend("publish_exchange", "", "我是发布订阅模式");
    }
}

consumer消费者接受消息01

@Component
@RabbitListener(queues = "publish_queue1")
public class PublishListener01 {

    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("publish 01: " + msg);
    }
}

consumer消费者接受消息02

@Component
@RabbitListener(queues = "publish_queue2")
public class PublishListener02 {

    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("publish 02: " + msg);
    }
}

7.5路由模式 Routing

简单理解:发布与订阅模式是将消息分别全部发送到队列,而路由模式是在交换机里配置路由键,生产者可以指定发送到那个路由键里,路由键有和对列一一对应,这样就会进入相应的对列

image-20210523144154004

创建2个队列

image-20210523144831663

创建交换机并绑定队列

image-20210523145347956

provider生产者发生消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class RoutingExchange {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendRouting() {
        /**
         * 参数一:交换机
         * 参数二:路由键
         * 参数三:发送的消息
         */
        for (int i = 0; i < 100; i++) {
            if (i/2==0){
                rabbitTemplate.convertAndSend("routing_exchange", "info", "我是发布订阅模式"+i+"info");
            }else {
                rabbitTemplate.convertAndSend("routing_exchange", "error", "我是发布订阅模式"+i+"error");

            }
        }

    }
}

consumer消费者接受消息01

@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener01 {

    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("routing_queue1: " + msg);
    }
}

consumer消费者接受消息02

@Component
@RabbitListener(queues = "routing_queue2")
public class RoutingListener02 {

    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("routing_queue2: " + msg);
    }
}

7.6主题模式(Topics通配符模式)

image-20210523150423592

面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。

举例:
item.#: 能够匹配item.insert.abc.bbc 或者item.insert
item.*:只能匹配item.insert

创建2个队列

image-20210523151018254

创建交换机并绑定队列

image-20210523151357846

provider生产者发生消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class TopicsExchange {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendTopics() {
        /**
         * 参数一:交换机
         * 参数二:路由键
         * 参数三:发送的消息
         */
            rabbitTemplate.convertAndSend("topics_exchange","item.insert","这条item.#和item.*都会有");
            rabbitTemplate.convertAndSend("topics_exchange","item.insert.abc","这条item.#有");
    }
}

consumer消费者接受消息01

@Component
@RabbitListener(queues = "topics_queu1")
public class TopicsListener01 {

    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("topics_queue1: " + msg);
    }
}

consumer消费者接受消息02

@Component
@RabbitListener(queues = "topics_queu2")
public class TopicsListener01 {

    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("topics_queue2: " + msg);
    }
}

7.7总结

工作模式:

  • 1、简单模式 HelloWorld : 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
  • 2、工作队列模式 Work Queue: 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默
    认的交换机)
  • 3、发布订阅模式 Publish/subscribe: 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,
    当发送消息到交换机后,交换机会将消息广播发送到绑定的队列
  • 4、路由模式 Routing: 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing
    key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
  • 5、通配符模式 Topic: 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式
    的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

8.高级篇

生产者确认模式

在生产者(发送消息放)

application.properties

# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms=true

# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns=true

confirm(config配置)

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *     * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
     *     * 设置消息确认回调方法
     *     * 设置消息回退回调方法
     *    
     */
    @PostConstruct
    public void initRabbitTemplate() {
        //设置消息确认回调方法
        rabbitTemplate.setConfirmCallback(this::confirm);
        rabbitTemplate.setReturnCallback(this::returnedMessage);
    }

    /**
     *    投递到交换机,不论投递成功还是失败都回调次方法
     *    @param correlationData 投递相关数据
     *    @param ack 是否投递到交换机
     *    @param cause 投递失败原因
     *    
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息进入交换机成功");
        } else {
            System.out.println("消息进入交换机失败, 失败原因:" + cause);
        }
    }


    /**
     *   当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法
     *   @param message 投递消息内容
     *   @param replyCode 返回错误状态码
     *   @param replyText 返回错误内容
     *   @param exchange 交换机名称
     *   @param routingKey 路由键
     *    
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("交换机路由至消息队列出错:>>>>>>>");
        System.out.println("交换机:" + exchange);
        System.out.println("路由键:" + routingKey);
        System.out.println("错误状态码:" + replyCode);
        System.out.println("错误原因:" + replyText);
        System.out.println("发送消息内容:" + message.toString());
        System.out.println("<<<<<<<<");
    }
}

消费者确认签收

消息确认的三种类型:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”
  • 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦)

application.properties

# 配置开启手动签收
# 简单模式的开启手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 路由模式开启手动签收
spring.rabbitmq.listener.direct.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.direct.retry.enabled=true

消费者接受消息

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener01 {

    @RabbitHandler
    public void simpleHandler(String msg, Message message, Channel channel) throws IOException {
        System.out.println("routing_queue1: " + msg);
        //获取投递标签

        MessageProperties messageProperties =
                message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();
        try {

            //  模拟异常
            // if (msg.contains("苹果")) {
             //    throw new RuntimeException("不允许卖苹果手机!!!");
             //}
            
            /**
             * 手动签收消息
             * 参数1:消息投递标签
             * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
             */
            channel.basicAck(deliveryTag, false);
            System.out.println("手动签收完成:{}");

        } catch (Exception ex) {
            
            /**
             * 手动拒绝签收
             * 参数1:当前消息的投递标签
             * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
             * 参数3:是否重回队列,true为重回队列,false为不重回
             */
            channel.basicNack(deliveryTag, false, true);
            System.out.println("拒绝签收,重回队列:{}" + ex);
        }
    }
}

版权声明:本文为weixin_46237429原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。