Work Queues 工作队列
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成的情况。
相反我们安排任务在之后执行。把任务封装为消息并将其发送给到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
3.1 轮训分发消息
在这个案例中我们会启动两个线程,一个消息发送线程,我们来看看这两个工作线程是如何工作的
3.1.1 抽取工具类
将单一不变的操作提取出来,成为工具类,方便使用
代码如下:
package com.example.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author qie ting feng yin
* @version 1.0
* @description: 连接工程创建信道的工具类
* @date 2022/4/12 0012 9:53
*/
public class RabbitUtils {
//得到一个连接的channel
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//工厂ip 连接RabbitMQ队列
connectionFactory.setHost("47.98.241.39");
//用户名
connectionFactory.setUsername("admin");
//密码
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道(生产者通过信道连接队列)
return connection.createChannel();
}
}
3.1.2 启动两个工作线程
编写程序,并行执行两次程序,来模拟两次工作线程,执行两次的方式:
IDEA左上角打开Edit Configurations
找到Modify options并点击
选择Allow multiple instances,并点击apply
代码如下:
package com.example.two;
import com.example.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author qie ting feng yin
* @version 1.0
* @description: 一个工作线程(相当于消费者)
* @date 2022/4/12 0012 10:09
*/
public class Work01 {
//队列的名称
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitUtils.getChannel();
//声名 消费者成功消费(接收)消息的回调(消息接收成功时执行)
DeliverCallback deliverCallback = (consumerTag, message) -> {
//打印消息体
System.out.println("消费者成功消费消息,消息体为:"+new String(message.getBody()));
};
//声名 消费者取消消费消息时的回调(消息接收被取消时执行)
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag+"消息消费被中断");
};
/**
* 消费者消费(接收)消息,参数含义如下:
* 1.消费哪个队列
* 2.消费之后是否要自动应答
* 3.消费者成功消费消息的回调
* 4.消费者取消消费消息的回调
*/
/* 并行执行模拟两个工作线程,轮训(彼此交互)接收大量消息
第一次执行
System.out.println("work01等待接收消息...");*/
//第二次执行
System.out.println("work02等待接收消息...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
执行结果如下:
此时没有发送消息,所以两个工作线程只是等待消息
3.1.3 启动一个发送线程
编写生产者代码,模拟发送大量消息
代码如下:
package com.example.two;
import com.example.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @author qie ting feng yin
* @version 1.0
* @description: 生产者 发送大量消息
* @date 2022/4/12 0012 11:18
*/
public class Task01 {
//队列名称
public static final String QUEUE_NAME = "hello";
//发送消息
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitUtils.getChannel();
//声名 消费者成功消费(接收)消息的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
//打印消息体
System.out.println("消费者成功消费消息,消息体为:"+new String(message.getBody()));
};
//声名 消费者取消消费消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/**
* 生成一个队列,参数的含义如下:
* 1.队列名称
* 2.队列里面的消息是否持久化(存储在磁盘上),默认情况false(存储在内存中)
* 3.该队列是否进行消息共享,默认false
* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息,从控制台中接收消息并发送
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
执行代码并模拟发送大量消息:
3.1.4 结果展示
查看两个工作线程的接收消息的情况,结果如下:
第一个工作线程的接收情况:
第二个工作线程的接收情况
小结:由两个工作线程接收消息的情况来看,首个消息的接受是随机的,但是后面的消息会轮流被线程接收,这就是 Work Queues(工作队列)的分发消息的方式——轮训分发消息
版权声明:本文为qq_52567278原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。