SpringBoot集成RabbitMQ

新建springboot项目

写pom文件

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE<version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>springboot-order-rabbitmq-product</artifactId>
    <version>0.0.1-SNAPSHOT<version>
    <name>springboot-order-rabbitmq-product</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.9</java.version>
    </properties>
    <dependencies>
<!--        rabbitmq  依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

  

application.yml

spring:
  application:
    name: springboot-rabbitmq

    #配置rabbitMq 服务器
  rabbitmq:
    host: 192.168.132.129 #这里改为你自己的IP地址
    port: 5672 # AMQP协议端口
    username: guest
    password: guest
    virtual-host: / # rabbitmq 的虚拟机

新建 两个 模块

Fanout 模式

在提供者模块中

新建 config和service 包

OrderService.java

package com.example.springbootorderrabbitmqproduct.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);

        //3.通过MQ来完成消息的分发

        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);

    }

}

RabbitMqConfiguration.java

package com.example.springbootorderrabbitmqproduct.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfiguration{
    //1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange",true,false);
    }
    //2.声明队列
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.fanout.queue",true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.fanout.queue",true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.fanout.queue",true);
    }


    //3.完成绑定关系   队列和交换机  的绑定
    @Bean
    public Binding smsBingding(){
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding duanxinBingding(){
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding emailBingding(){
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
}

在测试类中  测试:


@SpringBootTest
class SpringbootOrderRabbitmqProductApplicationTests {

    @Autowired
    OrderService orderService;

    @Test
    void contextLoads() {
        orderService.makeOrder("1","1",12);
    }

}

在consumer 模块中

新建 FanoutSmsConsumer.java

package com.example.springbootorderrabbitmqconsumer.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = {"sms.fanout.queue"})

public class FanoutSmsConsumer {

    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms接收到了的订单信息是:"+message);
    }
}


@Component
@RabbitListener(queues = {"duanxin.fanout.queue"})

public class FanoutDuanxinConsumer {

    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("短信s接收到了的订单信息是:"+message);
    }
}

@Component
@RabbitListener(queues = {"email.fanout.queue"})
/**
 * 短信服务
 */
public class FanoutEmailConsumer {

    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email 接收到了的订单信息是:"+message);
    }
}

Config类  最好定义在 consumer 模块

因为 如果 队列还没创建 ,启动consumer 会报错,consumer 是直接和队列打交道的

 

Direct 模式

与上面写fanout 差不多

提供者中

DirectOrderService

package com.example.springbootorderrabbitmqproduct.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class DirectOrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "direct_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,"email",orderId);
        rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderId);
    }

}

Consumer  模块中

把配置类  放在了consumer 模块中

DirectRabbitMqConfiguration  

package com.example.springbootorderrabbitmqproduct.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitMqConfiguration {
    //1.声明注册Direct模式的交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_order_exchange",true,false);
    }
    //2.声明队列
    @Bean
    public Queue directSmsQueue(){
        return new Queue("sms.direct.queue",true);
    }
    @Bean
    public Queue directDuanxinQueue(){
        return new Queue("duanxin.direct.queue",true);
    }
    @Bean
    public Queue directEmailQueue(){
        return new Queue("email.direct.queue",true);
    }


    //3.完成绑定关系   队列和交换机  的绑定
//    direct  多了一个 路由key  reoutingKey   分类的概念

    @Bean
    public Binding directSmsBingding(){
        return BindingBuilder.bind(directSmsQueue()).to(directExchange()).with("sms");
    }
    @Bean
    public Binding directDuanxinBingding(){
        return BindingBuilder.bind(directDuanxinQueue()).to(directExchange()).with("duanxin");
    }
    @Bean
    public Binding directEmailBingding(){
        return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
    }

}

DirectDuanxinConsumer.java


@Component
@RabbitListener(queues = {"duanxin.direct.queue"})

public class DirectDuanxinConsumer {

    @RabbitHandler
    public void directreviceMessage(String message){
        System.out.println("短信 接收到了的订单信息是:"+message);
    }
}

等其他两个

Topic 模式

提供者中

TopicOrderService


@Service
public class TopicOrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "topic_order_exchange";


        String routingKey = "com.duanxin";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);

    }

}

Consumer中  没要写配置类

把交换器和 队列的 关系 用注解写在了 服务上面

TopicDuanxinConsumer


@Component
//注解的方式  配置 交换器  和队列
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "duanxin.topic.queue",durable = "true", autoDelete = "false"),
        exchange =@Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.duanxin.#"

))

public class TopicDuanxinConsumer {

    @RabbitHandler
    public void topicreviceMessage(String message){
        System.out.println("短信duanxin topic 接收到了的订单信息是:"+message);
    }
}

TopicEmailConsumer


@Component
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "email.topic.queue",durable = "true", autoDelete = "false"),
        exchange =@Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "*.email.#"
))

/**
 * 短信服务
 */
public class TopicEmailConsumer {

    @RabbitHandler
    public void topicreviceMessage(String message){
        System.out.println("email topic 接收到了的订单信息是:"+message);
    }
}

TopicSmsConsumer


@Component
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "sms.topic.queue",durable = "true", autoDelete = "false"),
        exchange =@Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "com.#"
))

public class TopicSmsConsumer {

    @RabbitHandler
    public void topicreviceMessage(String message){
        System.out.println("sms topic 接收到了的订单信息是:"+message);
    }
}

测试类


版权声明:本文为weixin_51364443原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。