概述
消息队列(生产者-消费者模型)是Java多线程编程中的一个经典问题,主要描述的是生产者和消费者在同一时间段内共用一块存储空间(通常也称为缓冲区),工作时,生产者向缓冲区存放数据,而消费者则从缓冲区中取走数据。
策略模式 是一种行为型模式,定义了一系列的算法,并将每一个算法封装,并且使它们可相互替换。主要解决在有多种算法相似的情况下,使用 if…else 所带来的复杂体现且难以维护的问题。
抽象策略类(Strategy):定义一个所有算法都支持的通用接口,内容类使用这个接口调用由具体策略实现的算法。
具体策略类(OperationA、OperationB、OperationC…):实现相应的算法。
内容类(Context):封装具体的算法实现,提供外部调用方式(上层的调用需要与接口之间有一定的交互)。本文demo主要功能及角色
消息队列主要功能包括:消息的发送、缓存和接收;主要角色为MsgBox、Producer、Consumer
策略模式主要功能包括:消息策略的封装,提供上层调用的方式;主要角色为Context、Strategy、Message(对应具体策略类)消息队列简单关系模型

策略模式uml图

项目结构

为方便理解,博主将分为策略模式和消息队列两部分进行讲解
- 策略模式部分
定义接口Strategy
package com.me.msg;
public interface Strategy {
void doStrategyWork();
}
定义四个具体策略类(用于模拟不同的消息类型,以PhotoMessageStrategy为例,余下三种大体相同稍作修改即可)
package com.me.msg;
import java.text.SimpleDateFormat;
import java.util.Date;
public class PhotoMessageStrategy implements Strategy{
SimpleDateFormat df =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void doStrategyWork() {
System.out.println("查阅一则图片消息!\t"+df.format(new Date()));
}
}
消息类Message(对应四种不同消息类型)
package com.me.msg;
public class Message {
private String cmd;
public String getCmd() {
return cmd;
}
public void setCmd(String cmd) {
this.cmd = cmd;
}
}
内容类Context(策略上下文,拥有对策略接口的引用)
package com.me.msg;
import java.util.HashMap;
import java.util.Map;
public class Context {
private Strategy strategy;
//使用Map接口下的集合定义键值对存储数据
private Map<String, Strategy> map = new HashMap<String, Strategy>();
public Context() {
map.put("A", new PhotoMessageStrategy());
map.put("B", new EmotionMessageStrategy());
map.put("C", new VideoMessageStrategy());
map.put("D", new VoiceMessageStrategy());
}
//获取key值所对应的消息类型
public void invokingStrategy(Message message) {
strategy = map.get(message.getCmd());
strategy.doStrategyWork();
}
}
- 消息队列部分
定义一个消息盒子MsgBox(消息处理中心,存储类,实现消息同步处理)
package com.me.msg;
import java.util.LinkedList;
/**
* wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,并且释放锁资源,使得自己处于等待(阻塞)状态,让其他线程执行
* notify()方法:当生产者/消费者向缓冲区中装载/查阅一条消息时,向其他等待的线程发出可执行的通知,同时释放锁资源,使得自己处于等待状态
*/
public class MsgBox {
//定义消息盒子缓存容量
private static final int MAX_SIZE = 5;
//定义存储消息的载体
private static LinkedList<Object> list = new LinkedList<Object>();
/*
* 存消息-同步方法
*/
public synchronized void producer(Message message) {
while (list.size() == MAX_SIZE) {
System.err.println("盒子装不下消息啦小老弟");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(message);
System.out.println("【未读消息】" + list.size());
notifyAll();
}
/*
* 取消息-同步方法
*/
public synchronized Message consumer() {
while (list.size() == 0) {
System.err.println("没人给你发消息,好惨哦小老弟!");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return (Message) list.poll();
}
}
生产者类Producer(往消息盒子中存放消息)
package com.me.msg;
import java.util.Random;
public class Producer implements Runnable {
private MsgBox msgbox;
public Producer(MsgBox msgbox) {
this.msgbox = msgbox;
}
@Override
public void run() {
while (true) {
try {
//随机获取key值
String[] arr = { "A", "B", "C", "D" };
Message message = new Message();
Random r = new Random();
int i = r.nextInt(4);
message.setCmd(arr[i]);
msgbox.producer(message);
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者类Consumer(从消息盒子中取出消息)
package com.me.msg;
public class Consumer implements Runnable {
private MsgBox msgbox;
public Consumer(MsgBox msgbox) {
this.msgbox = msgbox;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(2500);
Message message = msgbox.consumer();
//实例化策略上下文并调用,返回message
Context context = new Context();
context.invokingStrategy(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
主程序类PolicyClient(开启多线程,执行生产消费方法)
package com.me.msg;
public class PolicyClient {
public static void main(String[] args) {
MsgBox msgbox = new MsgBox();
Producer producer = new Producer(msgbox);
for (int i = 0; i < 2; i++) {
new Thread(producer).start();
}
Consumer consumer = new Consumer(msgbox);
for (int i = 0; i < 3; i++) {
new Thread(consumer).start();
}
}
}
- 至此,消息队列结合策略模式实现demo完成(附部分运行结果↓)

- 注:代码中更改生产消费的休眠时间、缓存容量的大小或者开启的线程数量都会影响运行结果;另外,除本demo使用wait/notify来实现消息队列之外,读者还可自行尝试await()/signal()方法或BlockingQueue阻塞队列实现。
版权声明:本文为weixin_43849864原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。