消息中间件(六)——RabbitMQ的入门案例

1、RabbitMQ的安装

       RabbitMQ是用Erlang语言开发的,所以需要依赖Erlang的环境,因此在安装RabbitMQ之前要先安装Erlang。而他们之间是有版本对应关系的,可以从如下去查看:https://www.rabbitmq.com/download.html

进入这个网址之后(网址是英文的,现在的浏览器可以翻译为中文的),

然后就可以看到如下表格:

1.1 Erlang的安装

下载Erlang:http://erlang.org/download/otp_win64_20.3.exe

以管理员方式运行此文件,安装。 erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添加%ERLANG_HOME%\bin;

1.2 RabbitMQ的安装

下载RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3    以管理员方式运行此文件,安装。

1.3 RabbitMQ的启动

安装成功后会自动创建RabbitMQ服务并且启动

1)从开始菜单启动RabbitMQ

完成在开始菜单找到RabbitMQ的菜单:

RabbitMQ Service-install :安装服务 ​

RabbitMQ Service-remove 删除服务 ​

RabbitMQ Service-start 启动 ​

RabbitMQ Service-stop 停止

a、安装并运行服务 rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务

b、安装管理插件 安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到sbin目录下,管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management

 

2)如果没有开始菜单则进入安装目录下sbin目录手动启动:

a、安装并运行服务

rabbitmq-service.bat install 安装服务

rabbitmq-service.bat stop 停止服务

rabbitmq-service.bat start 启动服务

b、安装管理插件 安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ 进入到sbin目录下,管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management

1.4 启动成功 登录RabbitMQ

进入浏览器,输入:http://localhost:15672 初始账号和密码:guest/guest

2、入门程序

2.1 环境搭建

需要搭建生产者和消费者两个工程,生产者和消费者都属于客户端。

2.1.1 生产者工程搭建

创建生产者工程demo-rabbitmq-produce,引入RabbitMQ java client的依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbitmq</artifactId>
        <groupId>com.zdw.rabbitmq</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zdw.rabbitmq</groupId>
    <artifactId>demo-ribbitmq-prodcure</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.3</version>
        </dependency>
    </dependencies>

</project>

2.1.2 消费者工程搭建

创建生产者工程demo-rabbitmq-consumer,引入RabbitMQ java client的依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbitmq</artifactId>
        <groupId>com.zdw.rabbitmq</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zdw.rabbitmq</groupId>
    <artifactId>demo-ribbitmq-consumer</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.3</version>
        </dependency>
    </dependencies>

</project>

2.2 代码编写

2.2.1 生产者代码编写

package com.zdw.prodcure;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Create By zdw on 2019/9/16
 * 消息生产者代码
 */
public class DemoProdcure {

    private static final String QUEUE_NAME="helloworld";//定义队列名称
    public static void main(String[] args) {
        Connection connection=null;//定义连接
        Channel channel=null;//定义通道
        try{
            //创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//设置主机IP
            connectionFactory.setPort(5672);//设置端口号
            connectionFactory.setUsername("guest");//用户名
            connectionFactory.setPassword("guest");//密码

            connection = connectionFactory.newConnection();//得到连接对象

            //通过连接对象,创建与Exchange(交换机)的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();

            /**
             * 声明队列
             * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             * 1、queue:队列名称
             * 2、durable:是否持久化,如果持久化,mq重启之后消息还会在
             * 3、exclusive:是否独占连接,队列只允许在该连接中访问,
             *              如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete:自动删除,队列不再使用时是否自动删除此队列,
             *              如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments:参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);

            String message = "这是一个RabbitMQ的入门案例";//定义消息内容,实际开发中是业务相关的数据

            /**
             * 发送消息到mq:
             * 参数:String exchange, String routingKey, BasicProperties props, byte[] body
             * 1、exchange:交换机名称,如果不设置将使用mq默认的交换机(这设置为"")
             * 2、routingKey:路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
             * 3、props:消息属性
             * 4、body:消息主体
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            System.out.println("生产者发送了一条消息到MQ:"+message);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

执行上面的代码之后,我们可以在后台管理插件的页面查看相关信息:http://localhost:15672 初始账号和密码:guest/guest

点击helloworld的队列名称,可以进去查看详情:

2.2.2 消费者代码编写

package com.zdw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Create By zdw on 2019/9/16
 * 消息消费者代码
 */
public class DemoConsumer {
    private static final String QUEUE_NAME="helloworld";//定义队列名称,和生产者的队列名称一致

    public static void main(String[] args) {
        Connection connection = null;//定义连接
        Channel channel = null;//定义通道
        try {
            //创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");//设设置虚拟机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务

            connection = connectionFactory.newConnection();//得到与RabbitMQ服务的TCP连接

            channel = connection.createChannel();//创建通道

            /**
             * 声明队列:
             * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
             * 参数和之前生成者声明队列的参数是一样的
             */
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);

            //定义消费消息的方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

                /**
                 * 消费者接收消息后执行此方法
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时(channel.basicConsume)设置
                 * @param envelope     信封,通过envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
                 * @param properties   消息属性
                 * @param body         消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();//交换机
                    String routingKey = envelope.getRoutingKey();//路由key
                    long id = envelope.getDeliveryTag();//消息id
                    String content = new String(body,"utf-8");//消息内容

                    System.out.println("交换机名称:"+exchange+",路由key"+routingKey+",接收到的消息是:"+content);
                }
            };

            /**
             * 监听队列:
             * 参数:String queue, boolean autoAck, Consumer callback
             * 1、queue:队列名称
             * 2、autoAck:自动回复,当消费者接收到消息后要告诉mq消息已接收,
             *              如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback:消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

执行上面的代码,再次查看后台管理页面,会发现消息已经被消费掉了,并且也能看到队列helloworld绑定了一个消费者:

代码下载:https://download.csdn.net/download/zengdongwen/11784368


版权声明:本文为zengdongwen原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。