1、RabbitMQ的安装
RabbitMQ是用Erlang语言开发的,所以需要依赖Erlang的环境,因此在安装RabbitMQ之前要先安装Erlang。而他们之间是有版本对应关系的,可以从如下去查看:https://www.rabbitmq.com/download.html
进入这个网址之后(网址是英文的,现在的浏览器可以翻译为中文的),
然后就可以看到如下表格:
1.1 Erlang的安装
下载Erlang:http://erlang.org/download/otp_win64_20.3.exe
以管理员方式运行此文件,安装。 erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添加%ERLANG_HOME%\bin;
1.2 RabbitMQ的安装
下载RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3 以管理员方式运行此文件,安装。
1.3 RabbitMQ的启动
安装成功后会自动创建RabbitMQ服务并且启动
1)从开始菜单启动RabbitMQ
完成在开始菜单找到RabbitMQ的菜单:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 停止
a、安装并运行服务 rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
b、安装管理插件 安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到sbin目录下,管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
2)如果没有开始菜单则进入安装目录下sbin目录手动启动:
a、安装并运行服务
rabbitmq-service.bat install 安装服务
rabbitmq-service.bat stop 停止服务
rabbitmq-service.bat start 启动服务
b、安装管理插件 安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ 进入到sbin目录下,管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
1.4 启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672 初始账号和密码:guest/guest
2、入门程序
2.1 环境搭建
需要搭建生产者和消费者两个工程,生产者和消费者都属于客户端。
2.1.1 生产者工程搭建
创建生产者工程demo-rabbitmq-produce,引入RabbitMQ java client的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbitmq</artifactId>
<groupId>com.zdw.rabbitmq</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdw.rabbitmq</groupId>
<artifactId>demo-ribbitmq-prodcure</artifactId>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
</dependencies>
</project>
2.1.2 消费者工程搭建
创建生产者工程demo-rabbitmq-consumer,引入RabbitMQ java client的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbitmq</artifactId>
<groupId>com.zdw.rabbitmq</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdw.rabbitmq</groupId>
<artifactId>demo-ribbitmq-consumer</artifactId>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
</dependencies>
</project>
2.2 代码编写
2.2.1 生产者代码编写
package com.zdw.prodcure;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Create By zdw on 2019/9/16
* 消息生产者代码
*/
public class DemoProdcure {
private static final String QUEUE_NAME="helloworld";//定义队列名称
public static void main(String[] args) {
Connection connection=null;//定义连接
Channel channel=null;//定义通道
try{
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");//设置主机IP
connectionFactory.setPort(5672);//设置端口号
connectionFactory.setUsername("guest");//用户名
connectionFactory.setPassword("guest");//密码
connection = connectionFactory.newConnection();//得到连接对象
//通过连接对象,创建与Exchange(交换机)的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
/**
* 声明队列
* 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* 1、queue:队列名称
* 2、durable:是否持久化,如果持久化,mq重启之后消息还会在
* 3、exclusive:是否独占连接,队列只允许在该连接中访问,
* 如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete:自动删除,队列不再使用时是否自动删除此队列,
* 如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments:参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String message = "这是一个RabbitMQ的入门案例";//定义消息内容,实际开发中是业务相关的数据
/**
* 发送消息到mq:
* 参数:String exchange, String routingKey, BasicProperties props, byte[] body
* 1、exchange:交换机名称,如果不设置将使用mq默认的交换机(这设置为"")
* 2、routingKey:路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props:消息属性
* 4、body:消息主体
* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
* 默认的交换机,routingKey等于队列名称
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("生产者发送了一条消息到MQ:"+message);
}catch (Exception e){
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
执行上面的代码之后,我们可以在后台管理插件的页面查看相关信息:http://localhost:15672 初始账号和密码:guest/guest
点击helloworld的队列名称,可以进去查看详情:
2.2.2 消费者代码编写
package com.zdw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Create By zdw on 2019/9/16
* 消息消费者代码
*/
public class DemoConsumer {
private static final String QUEUE_NAME="helloworld";//定义队列名称,和生产者的队列名称一致
public static void main(String[] args) {
Connection connection = null;//定义连接
Channel channel = null;//定义通道
try {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//设设置虚拟机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
connection = connectionFactory.newConnection();//得到与RabbitMQ服务的TCP连接
channel = connection.createChannel();//创建通道
/**
* 声明队列:
* 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
* 参数和之前生成者声明队列的参数是一样的
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//定义消费消息的方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 消费者接收消息后执行此方法
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时(channel.basicConsume)设置
* @param envelope 信封,通过envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();//交换机
String routingKey = envelope.getRoutingKey();//路由key
long id = envelope.getDeliveryTag();//消息id
String content = new String(body,"utf-8");//消息内容
System.out.println("交换机名称:"+exchange+",路由key"+routingKey+",接收到的消息是:"+content);
}
};
/**
* 监听队列:
* 参数:String queue, boolean autoAck, Consumer callback
* 1、queue:队列名称
* 2、autoAck:自动回复,当消费者接收到消息后要告诉mq消息已接收,
* 如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback:消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
执行上面的代码,再次查看后台管理页面,会发现消息已经被消费掉了,并且也能看到队列helloworld绑定了一个消费者:
代码下载:https://download.csdn.net/download/zengdongwen/11784368