消息队列(生产消费者模型)--结合策略模式

概述
  • 消息队列(生产者-消费者模型)是Java多线程编程中的一个经典问题,主要描述的是生产者和消费者在同一时间段内共用一块存储空间(通常也称为缓冲区),工作时,生产者向缓冲区存放数据,而消费者则从缓冲区中取走数据。

  • 策略模式 是一种行为型模式,定义了一系列的算法,并将每一个算法封装,并且使它们可相互替换。主要解决在有多种算法相似的情况下,使用 if…else 所带来的复杂体现且难以维护的问题。
    抽象策略类(Strategy):定义一个所有算法都支持的通用接口,内容类使用这个接口调用由具体策略实现的算法。
    具体策略类(OperationA、OperationB、OperationC…):实现相应的算法。
    内容类(Context):封装具体的算法实现,提供外部调用方式(上层的调用需要与接口之间有一定的交互)。

  • 本文demo主要功能及角色
    消息队列主要功能包括:消息的发送、缓存和接收;主要角色为MsgBox、Producer、Consumer
    策略模式主要功能包括:消息策略的封装,提供上层调用的方式;主要角色为Context、Strategy、Message(对应具体策略类)

  • 消息队列简单关系模型
    在这里插入图片描述

  • 策略模式uml图
    在这里插入图片描述

  • 项目结构
    在这里插入图片描述

为方便理解,博主将分为策略模式和消息队列两部分进行讲解
  • 策略模式部分
    定义接口Strategy
package com.me.msg;

public interface Strategy {

	void doStrategyWork();
}

定义四个具体策略类(用于模拟不同的消息类型,以PhotoMessageStrategy为例,余下三种大体相同稍作修改即可)

package com.me.msg;

import java.text.SimpleDateFormat;
import java.util.Date;

public class PhotoMessageStrategy implements Strategy{
	
	SimpleDateFormat df =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	@Override
	public void doStrategyWork() {
		System.out.println("查阅一则图片消息!\t"+df.format(new Date()));
	}
}

消息类Message(对应四种不同消息类型)

package com.me.msg;

public class Message {
	private String cmd;
	
	public String getCmd() {
		return cmd;
	}
	public void setCmd(String cmd) {
		this.cmd = cmd;
	}
}

内容类Context(策略上下文,拥有对策略接口的引用)

package com.me.msg;

import java.util.HashMap;
import java.util.Map;

public class Context {

	private Strategy strategy;
	
	//使用Map接口下的集合定义键值对存储数据
	private Map<String, Strategy> map = new HashMap<String, Strategy>();
	
	public Context() {
		map.put("A", new PhotoMessageStrategy());
		map.put("B", new EmotionMessageStrategy());
		map.put("C", new VideoMessageStrategy());
		map.put("D", new VoiceMessageStrategy());
	}
	//获取key值所对应的消息类型
	public void invokingStrategy(Message message) {
		strategy = map.get(message.getCmd());
		strategy.doStrategyWork();
	}
}
  • 消息队列部分
    定义一个消息盒子MsgBox(消息处理中心,存储类,实现消息同步处理)
package com.me.msg;

import java.util.LinkedList;
/**
 * wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,并且释放锁资源,使得自己处于等待(阻塞)状态,让其他线程执行
 * notify()方法:当生产者/消费者向缓冲区中装载/查阅一条消息时,向其他等待的线程发出可执行的通知,同时释放锁资源,使得自己处于等待状态 
 */
public class MsgBox {
	
	//定义消息盒子缓存容量
	private static final int MAX_SIZE = 5;
	//定义存储消息的载体
	private static LinkedList<Object> list = new LinkedList<Object>();

	/*
	 * 存消息-同步方法
	 */
	public synchronized void producer(Message message) {
		while (list.size() == MAX_SIZE) {
			System.err.println("盒子装不下消息啦小老弟");
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		list.add(message);
		System.out.println("【未读消息】" + list.size());
		notifyAll();
	}

	/*
	 * 取消息-同步方法
	 */
	public synchronized Message consumer() {
		while (list.size() == 0) {
			System.err.println("没人给你发消息,好惨哦小老弟!");
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		return (Message) list.poll();
	}
}

生产者类Producer(往消息盒子中存放消息)

package com.me.msg;

import java.util.Random;

public class Producer implements Runnable {

	private MsgBox msgbox;

	public Producer(MsgBox msgbox) {
		this.msgbox = msgbox;
	}

	@Override
	public void run() {
		while (true) {
			try {
				//随机获取key值
				String[] arr = { "A", "B", "C", "D" };
				Message message = new Message();
				Random r = new Random();
				int i = r.nextInt(4);
				message.setCmd(arr[i]);
				msgbox.producer(message);
				Thread.sleep(1500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

消费者类Consumer(从消息盒子中取出消息)

package com.me.msg;

public class Consumer implements Runnable {

	private MsgBox msgbox;

	public Consumer(MsgBox msgbox) {
		this.msgbox = msgbox;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep(2500);
				Message message = msgbox.consumer();
				//实例化策略上下文并调用,返回message
				Context context = new Context();
				context.invokingStrategy(message);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

主程序类PolicyClient(开启多线程,执行生产消费方法)

package com.me.msg;

public class PolicyClient {
	public static void main(String[] args) {
	
		MsgBox msgbox = new MsgBox();

		Producer producer = new Producer(msgbox);
		for (int i = 0; i < 2; i++) {
			new Thread(producer).start();
		}
		Consumer consumer = new Consumer(msgbox);
		for (int i = 0; i < 3; i++) {
			new Thread(consumer).start();
		}
	}
}

  • 至此,消息队列结合策略模式实现demo完成(附部分运行结果↓)
    在这里插入图片描述
  • 注:代码中更改生产消费的休眠时间、缓存容量的大小或者开启的线程数量都会影响运行结果;另外,除本demo使用wait/notify来实现消息队列之外,读者还可自行尝试await()/signal()方法或BlockingQueue阻塞队列实现。

版权声明:本文为weixin_43849864原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。