RabbitMQ
文章目录
- RabbitMQ是由LShift提供的一个Advanced Message Queuing Protocol(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成,因此也是继承了这些优点。
安装
选择RPM包下载,选择对应平台,本次安装在CentOS7,其他平台类似。https://www.rabbitmq.com/install-rpm.html
由于使用了erlang语言开发,所以需要erlang的包。erlang和RabbitMQ的兼容性,参考https://www.rabbitmq.com/which-erlang.html#compatibility-matrix
第二种错误,可以修改host文件,修改主机名即可
下载 rabbitmq-server-3.7.16-1.el7.noarch.rpm、erlang-21.3.8.6-1.el7.x86_64.rpm。socat在CentOS中源中有。
yum -y install erlang-21.3.8.6-1.el7.x86_64.rpm rabbitmq-server-3.7.16-1.el7.noarch.rpm
- 查看安装的文件
[root@xdd ~]# rpm -ql rabbitmq-server
/etc/logrotate.d/rabbitmq-server
/etc/profile.d/rabbitmqctl-autocomplete.sh
/etc/rabbitmq
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha
/usr/lib/rabbitmq/autocomplete/bash_autocomplete.sh
/usr/lib/rabbitmq/autocomplete/zsh_autocomplete.sh
/usr/lib/rabbitmq/bin/cuttlefish
/usr/lib/rabbitmq/bin/rabbitmq-defaults
配置
环境配置
- 使用系统环境变量,如果没有使用rabbitmq-env.conf中定义环境变量,否则使用缺省值
RABBITMQ_NODE_IP_ADDRESS the empty string, meaning that it should bind to all network interfaces.
RABBITMQ_NODE_PORT 5672
RABBITMQ_DIST_PORT RABBITMQ_NODE_PORT + 20000 #内部节点和客户端工具通信用
RABBITMQ_CONFIG_FILE 配置文件路径默认为/etc/rabbitmq/rabbitmq
环境变量文件,可以不配置
工作特性配置文件
- rabbitmq.config配置文件
- 3.7支持新旧两种配置文件格式
- erlang配置文件格式,为了兼容继续采用
2. sysctl格式,如果不需要兼容,RabbitMQ鼓励使用。 (这个文件也可以不配置)
插件管理
列出所有可用插件
rabbitmq-plugins list
- 启动WEB管理插件,会依赖启用其他几个插件。
[root@xdd rabbitmq]$ rabbitmq-plugins enable rabbitmq_management
启动服务
systemctl start rabbitmq-server
- 启动中,可能出现下面的错误
Error when reading /var/lib/rabbitmq/.erlang.cookie:eacces
这就是这个文件的权限问题,修改属组、属组为rabbitmq即可
chown rabbitmq.rabbitmq /var/lib/rabbitmq/.erlang.cookie
- 服务启动成功
[root@xdd ~]# ss -tanl | grep 5672
LISTEN 0 128 *:25672 *:*
LISTEN 0 128 *:15672 *:*
LISTEN 0 128 :::5672 :::*
[root@xdd ~]#
用户管理
开始登陆WEB界面,
http://192.168.61.108(rabbitmq所在主机的ip):15672
使用guest/guest只能本地登陆,远程登录会报错
- rabbitmqctl命令
rabbitmqctl [-n <node>] [-1][-q] <command> [<command options>]
- General options:
-n
node-q
,–quiet-t
,–timeout timeout-l
longnames
- Commands:
add_user <username> <password>
添加用户list_user
列出用户delete_user username
删除用户change_password <username> <password>
修改用户名,密码set_user_tags <username> <tag> [...]
设置用户tag- list_user_permissions 列出用户权限
添加用户:
rabbitmqctl add_user username password
删除用户:
rabbitmqctl delete_user username
更改密码:
rabbitmqctl change_password username newpassword
设置权限Tags,其实就是分配组:
rabbitmqctl set_user_tags username tag
设置xdd用户为管理员tag后登陆
# rabbitmqctl add_user gdy gdy #添加xdd用户
# rabbitmqctl list_users #查看所有用户
# rabbitmqctl set_user_tags gdy administrator #设置xdd用户为管理员用户
tag的意义如下:
- administrator可以管理用户、权限、虚拟主机。
- administrator可以管理用户、权限、虚拟主机。
基本信息(web管理端口15672,协议端口5672)
虚拟主机
- 缺省虚拟主机,默认只能是guest用户在本机链接,下图新建的用户gdy默认无法访问任何虚拟主机
- 缺省虚拟主机,默认只能是guest用户在本机链接,下图新建的用户gdy默认无法访问任何虚拟主机
Pika库
- Pika是纯Python实现的支持AMQP协议的库
pip install pika
RabbitMQ工作原理及应用
工作模式
- 名词解释
名词 | 说明 |
---|---|
Server | 服务器 接受客户端连接,实现消息队列及路由功能的进程(服务),也称为消息代理 注意:客户端可用生产者,也可以是消费者,它们都需要连接到Server |
Connection | 网络物理连接 |
Channel | 一个连接允许多个客户端连接 |
Exchange | 交换器。接收生产者发来的消息,决定如何路由给服务器中的队列。 常用的类型有: direct(point-to-point) topic(publish-subscribe) fanout(multicast) |
Message | 消息 |
Message Queue | 消息队列,数据的存储载体 |
Bind | 绑定 建立消息队列和交换器之间的关系,也就是说交换器拿到数据,把什么样的数据送给哪个队列 |
Virtual Host | 虚拟主机 一批交换器、消息队列和相关对象的集合。为了多用户互不干扰,使用虚拟主机分组交换机,消息队列 |
Topic | 主题、话题 |
Broker | 可等价为Server |
1.队列
- 这种模式就是最简单的生产者消费者模型,消息队列就是一个FIFO的队列
- 生产者send.py,消费者receie.py
- 官方例子:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
- 注意:出现如下运行结果
pika.exceptions.ProbableAuthenticationError: (403, 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.')
- 访问被拒绝,还是权限问题,原因还是guest用户只能访问localhost上的缺省虚拟主机
- 解决办法
- 缺省虚拟主机,默认只能在本机访问,不要修改为远程访问,是安全的考虑。
- 因此,在Admin中Virtual hosts中,新建一个虚拟主机test。
- 注意:新建的test虚拟主机的Users是谁,本次是gdy用户
在ConnectionParameters中没有用户名、密码填写的参数,它使用参数credentials传入,这个需要构建一个pika.credentials.Credentials对象。
参照官方例子,写一个小程序
# send.py
import pika
from pika.adapters.blocking_connection import BlockingChannel
#构建用户名密码对象
credential = pika.PlainCredentials("gdy","gdy")
# 配置链接参数
params = pika.ConnectionParameters(
"192.168.61.108",#ip地址
5672, #端口
"test",#虚拟机
credential #用户名密码
)
# # 第二种建立连接方式
# params = pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test")
# 建立连接
connection = pika.BlockingConnection(params)
with connection:
# 建立通道
channel:BlockingChannel = connection.channel()
#创建一个队列,queue命名为hello,如果queue不存在,消息将被dropped
channel.queue_declare(queue="hello")
channel.basic_publish(
exchange="",#使用缺省exchange
routing_key="hello", #routing_key必须指定,这里要求和目标queue一致
body="Hello world" #消息
)
print("消息发送成功Sent Message OK")
测试通过。去服务管理界面查看Exchanges和Queues。
URLParameters,也可以使用URL创建参数
# amqp://username:password@host:port/<virtual_host>[?query-string]
parameters = pika.URLParameters('amqp://guest:guest@rabbit-server1:5672/%2F')
# %2F指代/,就是缺省虚拟主机
- queue_declare声明一个queue,有必要可以创建。
- basic_publish exchange为空就使用缺省exchange,如果找不到指定的exchange,抛异
- 使用缺省exchange,就必须指定routing_key,使用它找到queue
- 修改上面生产者代码,让生产者连续发送send Message。在web端查看Queues中Ready的变化
# send.py
import pika
from pika.adapters.blocking_connection import BlockingChannel
import time
# 第二种建立连接方式
params = pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test")
# 建立连接
connection = pika.BlockingConnection(params)
with connection:
# 建立通道
channel:BlockingChannel = connection.channel()
#创建一个队列,queue命名为hello,如果queue不存在,消息将被dropped
channel.queue_declare(queue="hello")
for i in range(40):
channel.basic_publish(
exchange="",#使用缺省exchange
routing_key="hello", #routing_key必须指定,这里要求和目标queue一致
body="data{:02}".format(i) #消息
)
time.sleep(0.5)
print("消息发送成功Sent Message OK")
- 构建receive.py消费者代码
- 单个消费消息
- BlockingChannel.basic_get(“queue名称”,是否阻塞)->(method,props,body)
- body为返回的消息
- BlockingChannel.basic_get(“queue名称”,是否阻塞)->(method,props,body)
# receie.py
import pika
from pika.adapters.blocking_connection import BlockingChannel
# 建立连接
params = pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test")
connection = pika.BlockingConnection(params)
with connection:
channel:BlockingChannel = connection.channel()
msg = channel.basic_get("hello",True) #从名称为hello的queue队列中获取消息,获取不到阻塞
method,props,body = msg #拿不到的消息tuple为(None,None,None)
if body:
print("获取到了一个消息Get A message = {}".format(body))
else:
print("没有获取到消息empty")
- 获取到消息后msg的结构如下:
(<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=38', 'redelivered=False', 'routing_key=hello'])>, <BasicProperties>, b'data01')
返回元组:(方法method,属性properties,消息body)
无数据返回:(None,None,None)
- 批量消费消息recieve.py
# receie.py 消费代码
import pika
from pika.adapters.blocking_connection import BlockingChannel
# 建立连接
params = pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test")
connection = pika.BlockingConnection(params)
def callback(channel,method,properties,body):
print("Get a message = {}".format(body))
with connection:
channel:BlockingChannel = connection.channel()
channel.basic_consume(
"hello",#队列名
callback,#消费回调函数
True,#不回应
)
print("等待消息,退出按CTRL+C;Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
2.工作队列
- 继续使用队列模式的生产者消费者代码,启动2个消费者。观察结果,可以看到,2个消费者是交替拿到不同的消息。
- 这种工作模式时一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它。
- 从结果知道,使用的是轮询方式拿走数据的。
- 注意:虽然上面的图中没有画出exchange。用到缺省exchange。
3.发布、订阅模式(Publish/Subscribe)
- Publish/Subscribe发布订阅,想象一下订阅者(消费者)订阅这个报纸(消息),都应该拿到一份同样内容的报纸。
- 订阅者和消费者之间还有一个exchange,可以想象成邮局,消费者去邮局订阅报纸,报社发报纸到邮局,邮局决定如何投递到消费者手中。
- 上例子中工作队列模式的使用,相当于,每个人只能拿到不同的报纸。所以不适合发布订阅模式。
- 当模式的exchange的type是fanout,就是一对多,即广播模式。
- 注意,同一个queue的消息只能被消费一次,所以,这里使用了多个queue,相当于为了保证不同的消费者拿到同样的数据,每一个消费者都应该有自己的queue。
# 生成一个交换机
channel.exchange_declare(
exchange="logs", #新交换机
exchange_type="fanout" #广播
)
- 生产者使用广播模式。在test虚拟主机中构建了一个logs交换机
- 至于queue,可以由生产者创建,也可以由消费者创建。
- 本次采用使用消费者端创建,生产者把数据都发往交换机logs,采用了fanout,然后将数据通过交换机发往已经绑定到此交换机的所有queue。
- 绑定Bingding,建立exchange和queue之间的联系
# 消费者端
result =channel.queue_declare(queue="") #生成一个随机名称的queue
resutl = channel.queue_declare(queue="",exclusive=True) #生成一个随机名称的queue,并在断开链接时删除queue
# 生成queue
q1:Method = channel.queue_declare(queue="",exclusive=True)
q2:Method = channel.queue_declare(queue="",exclusive=True)
q1name = q1.method.queue #可以通过result.method.queue 查看随机名称
q2name = q2.method.queue
print(q1name,q2name)
#绑定
channel.queue_bind(exchange="logs",queue=q1name)
channel.queue_bind(exchange="logs",queue=q2name)
- 生成者代码
- 注意观察 交换机和队列
# send.py 生产者代码
import pika
from pika.adapters.blocking_connection import BlockingChannel
import time
# 建立连接
params = pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test")
connection = pika.BlockingConnection(params)
channel:BlockingChannel = connection.channel()
with connection:
#指定交换机和模式
channel.exchange_declare(
exchange="logs",#新交换机
exchange_type="fanout" #扇出,广播
)
for i in range(40):
channel.basic_publish(
exchange="logs",#使用指定的exhcange
routing_key="", #广播模式,不指定routing_key
body = "data-{:02}".format(i) #消息
)
time.sleep(0.01)
print("消息发送完成")
- 特别注意:如果先开启生产者,由于没有队列queue,请观察数据
- 消费者代码
- 构建queue并绑定到test虚拟机的logs交换机上
# receie.py 消费者代码
import time
import pika
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
connection:BlockingConnection = pika.BlockingConnection(pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test"))
channel:BlockingChannel = connection.channel()
# 指定交换机
channel.exchange_declare(exchange="logs",exchange_type="fanout")
q1 = channel.queue_declare(queue="",exclusive=True)
q2 = channel.queue_declare(queue="",exclusive=True)
name1 = q1.method.queue #队列名
name2 = q2.method.queue
#为交换机绑定queue
channel.queue_bind(exchange="logs",queue=name1)
channel.queue_bind(exchange="logs",queue=name2)
def callback(channel,method,properties,body):
print("{}\n{}".format(channel,method))
print("获取了一个消息 Get a message = {}".format(body))
with connection:
#为第一个队列绑定消费者函数
channel.basic_consume(
name1,#队列名
callback, #消费者回调函数
True #不回应
)
#为第二个队列绑定消费者函数
channel.basic_consume(name2,callback,True)
print("等待消息,退出按CTRL+C;Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
- 先启动消费者receie.py可以看到已经创建了exchange
- 如果exchange是fanout,也就是广播了,routing_key就不用关心了。
q1 = channel.queue_declare(queue="",exclusive=True)
q2 = channel.queue_declare(queue="",exclusive=True)
- 注意:演示时要先启动消费者,再启动生产。如果先尝试启动生产者,在启动消费者会导致部分数据丢失。因为:exchange收了数据,没有queue接受,所以,exchange丢弃了这些数据。
4.路由模式Routing
- 路由其实就是生产者的数据经过exhange的时候,通过匹配规则,决定数据的去向。
- 生产者代码,交换机类型为direct,指定路由的key
# send生产者
import time
import pika
import random
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
exchangename = "color"
colors = ("orange","black","green")
#建立连接
connection:BlockingConnection = pika.BlockingConnection(pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test"))
channel:BlockingChannel = connection.channel()
with connection:
channel.exchange_declare(
exchange=exchangename,#使用指定的exchange
exchange_type="direct" #路由模式
)
for i in range(40):
rk = random.choice(colors)
msg = "{}-data-{:02}".format(rk,i)
channel.basic_publish(
exchange=exchangename,#
routing_key=rk,#指定routing_key
body=msg #消息
)
print(msg,"----")
time.sleep(0.01)
print("消息发送完成 Sent ok")
- 消费者代码
# receie.py消费者
import time
import pika
import random
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
exchangename = "color"
colors = ("orange","black","green")
#建立连接
connection:BlockingConnection = pika.BlockingConnection(pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test"))
channel:BlockingChannel = connection.channel()
channel.exchange_declare(exchange=exchangename,exchange_type="direct")
# 生成队列,名称随机,exclusive=True断开删除该队列
q1 = channel.queue_declare(queue="",exclusive=True)
q2 = channel.queue_declare(queue="",exclusive=True)
name1 = q1.method.queue #查看队列名
name2 = q2.method.queue
print(name1,name2)
#绑定到交换机,而且一定要绑定routing_key
channel.queue_bind(exchange=exchangename,queue=name1,routing_key=colors[0])
channel.queue_bind(exchange=exchangename,queue=name2,routing_key=colors[1])
channel.queue_bind(exchange=exchangename,queue=name2,routing_key=colors[2])
def callback(channel,method,properties,body):
print("{}\n{}".format(channel,method))
print("获取了一个消息get a message = {}".format(body))
print()
with connection:
channel.basic_consume(
name1,#队列名
callback, #消息回调函数
True #不回应
)
channel.basic_consume(name2,callback,True)
print("等待消息,退出按CTRL+C;Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
注意:如果routing_key设置一样,绑定的时候指定routing_key=‘black’,如下图。和fanout就类似了,都是1对多,但是不同。
- 因为fanout时,exchange不做数据过滤,1个消息,所有绑定的queue都会拿到一个副部。
- direct时候,要按照routing_key分配数据,上图的black有2个queue设置了,就会把1个消息分发给这2个queue。
5.Topic话题
- Topic就是更加高级的路由,支持模式匹配而已。
- Topic的routing_key必须使用
.
点号分割的单词组成。最多255个字节。 - 支持使用通配符:
*
表示严格的一个单词#
表示0个或多个单词
- 如果queue绑定的routing_key只是一个
#
,这个queue其实可以接收所有的消息。 - 如果没有使用任何通配符,效果类似于direct,因为只能和字符串匹配了。
- 生产者代码
# send.py生产者代码
import time
import pika
import random
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
exchangename = "products"
#产品和颜色搭配
colors = ("orange","black","green")
topics = ("phone.*","*.red") #两种话题
product_type = ("phone","pc","tv") #3种产品
#建立连接
connection:BlockingConnection = pika.BlockingConnection(pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test"))
channel:BlockingChannel = connection.channel()
#指定交换机为话题模式
channel.exchange_declare(exchange=exchangename,exchange_type="topic")
with connection:
for i in range(40):
rk = "{}.{}".format(random.choice(product_type),random.choice(colors))
msg = "{}-data-{:02}".format(rk,i)
channel.basic_publish(
exchange=exchangename,#使用指定的exchange
routing_key=rk,#指定routing_key
body=msg #消息
)
print(msg,"-----")
time.sleep(0.5)
print("消息发送完成 Sent ok")
- 消费者代码
# recieve.py 消费者代码
import time
import pika
import random
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
# 虚拟机名称
exchangename = "products"
#建立连接
connection:BlockingConnection = pika.BlockingConnection(pika.URLParameters("amqp://gdy:gdy@192.168.61.108:5672/test"))
channel:BlockingChannel = connection.channel()
#指定虚拟机,交换机为话题模式
channel.exchange_declare(exchange=exchangename,exchange_type="topic")
# 生成队列,名称随机,exclusive=True断开删除该队列
q1 = channel.queue_declare(queue="",exclusive=True)
q2 = channel.queue_declare(queue="",exclusive=True)
name1 = q1.method.queue #查看队列名
name2 = q2.method.queue
print(name1,name2)
#绑定到交换机,而且一定绑定routing_key
#q1只收集phone开头的routing_key的消息,也就是说只管收集类型信息
channel.queue_bind(exchange=exchangename,queue=name1,routing_key="phone.*")
# q2只收集red结尾的routing_key的消息,也就是说只管红色的信息
channel.queue_bind(exchange=exchangename,queue=name2,routing_key="*.red")
def callback(channel,method,properties,body):
print("{}\n{}".format(channel,method))
print("获取了一个消息Get a message = {}".format(body))
print()
with connection:
channel.basic_consume(
name1,#队列名
callback,#消息回调函数
True #不回应
)
channel.basic_consume(name2,callback,True)
print("等待消息,退出按CTRL+C;Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
观察消费者拿到的数据,注意观察phone.red的数据出现次数。
由此,可以知道交换机在路由消息的时候,只要和queue的routing_key匹配,就把消息发给该queue。
RPC远程过程调用
- RabbitMQ的RPC的应用场景较少,因为有更好的RPC通信框架。
消息队列的作用
- 系统间解耦
- 解决生产者、消费者速度匹配
- 由于稍微上规模的项目都会分层、分模块开发,模块间或系统间尽量不要直接耦合,需要开放公共接口提供给别的模块或系统调用,而调用可能触发并发问题,为了缓冲和解耦,往往采用中间件技术。
- RabbitMQ只是消息中间件中的一种应用程序,也是较常用的中间件服务。