06. Spring整合RabbitMQ

06. Spring整合RabbitMQ

  • 五种消息模型,在企业中应用最广泛的就是最后一种:定向匹配topic
  • Spring AMQP 是基于 Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,简化了我们对于RabbitMQ相关程序的开发

1.生产者工程

1.1 引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wh</groupId>
    <artifactId>spring-rabbitmq-producer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
    </dependencies>

</project>

1.2 配置spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--1.配置连接-->
    <rabbit:connection-factory id="connectionFactory" host="192.168.77.182" port="5672" username="root" password="root" virtual-host="/weihong"/>
    <!--2.配置队列-->
    <rabbit:queue name="test_spring_queue_1"/>
    <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机。发送消息-->
    <rabbit:admin connection-factory="connectionFactory" />
    <!--4.配置交换机,topic类型-->
    <rabbit:topic-exchange name="spring_topic_exchange">
        <rabbit:bindings>
            <!--绑定队列-->
            <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--5.配置json转换的工具-->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
    <!--6.配置rabbit的模板-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter" />
</beans>

1.3 测试发消息

package test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.HashMap;
import java.util.Map;

/**
 * @author WeiHong
 * @date 2021 -  09 - 14 20:12
 */
public class Sender {
    public static void main(String[] args){
        //1.创建spring容器
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
        //2.从容器中获得rabbit模板对象
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        //3.发消息
        Map<String,String> map = new HashMap<String,String>();
        map.put("name","魏红");
        map.put("email","2402682949@qq.com");
        rabbitTemplate.convertAndSend("msg.user",map);
        context.close();
    }
}

2.消费端工程

2.1依赖与生产者一致

2.2 spring-rabbitmq-consumer.xml代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
    <!--1.配置连接-->
    <rabbit:connection-factory id="connectionFactory" host="192.168.77.182" port="5672" username="root" password="root" virtual-host="/weihong"/>
    <!--2.配置队列-->
    <rabbit:queue name="test_spring_queue_1"/>
    <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机。发送消息-->
    <rabbit:admin connection-factory="connectionFactory" />
    <!--4.注解扫描包(SpringIoc)-->
    <context:component-scan base-package="listener"/>
    <!--5配置监听-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
    </rabbit:listener-container>
</beans>

2.2消费者

  • MessageListener接口用于spring容器接收到消息后处理消息

  • 如果需要使用自己定义的类型 来实现 处理消息时,必须实现该接口,并重写onMessage()方法

  • 当spring容器接收消息后,会自动交由onMessage进行处理

package listener;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author WeiHong
 * @date 2021 -  09 - 14 20:46
 *
 */
@Component
public class ConsumerListener implements MessageListener {
    //jasckson提供序列化和反序列化中使用哦最多的类,用来转换json的
    public static final ObjectMapper MAPPER = new ObjectMapper();
    @Override
    public void onMessage(Message message) {
        //将message对象转换成json
        try {
            JsonNode jsonNode = MAPPER.readTree(message.getBody());
            String name = jsonNode.get("name").asText();
            String email = jsonNode.get("email").asText();
            System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】");
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

2.3启动项目

package listener;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;

/**
 * @author WeiHong
 * @date 2021 -  09 - 14 21:05
 * 运行项目
 */
public class test {
    public static void main(String[] args) throws IOException {
        //获取容器
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
        //让程序一直跑起来,不终止
        System.in.read();


    }
}

版权声明:本文为qq_41239465原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。