生产者与消费者模型

生产消费者模型

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。

在这里插入图片描述

采用synchronized锁以及wait notify方法实现

代码源自菜鸟教程

//生产者消费者模型
public class ProducerConsumerTest {
    public static void main(String[] args) {
        CubbyHole c = new CubbyHole();
        Producer p1 = new Producer(c, 1);
        Consumer c1 = new Consumer(c, 1);
        p1.start();
        c1.start();
    }
}
class CubbyHole {
    private int contents;
    private boolean available = false;
    public synchronized int get() {
        while (available == false) {
            try {
                wait();
            }
            catch (InterruptedException e) {
            }
        }
        available = false;
        notifyAll();
        return contents;
    }
    public synchronized void put(int value) {
        while (available == true) {
            try {
                wait();
            }
            catch (InterruptedException e) {
            }
        }
        contents = value;
        available = true;
        notifyAll();
    }
}

class Consumer extends Thread {
    private CubbyHole cubbyhole;
    private int number;
    public Consumer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }
    public void run() {
        int value = 0;
        for (int i = 0; i < 10; i++) {
            value = cubbyhole.get();
            System.out.println("消费者 #" + this.number+ " got: " + value);
        }
    }
}

class Producer extends Thread {
    private CubbyHole cubbyhole;
    private int number;

    public Producer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            cubbyhole.put(i);
            System.out.println("生产者 #" + this.number + " put: " + i);
            try {
                sleep((int)(Math.random() * 100));
            } catch (InterruptedException e) { }
        }
    }
}

运行结果:

消费者 #1 got: 0
生产者 #1 put: 0
生产者 #1 put: 1
消费者 #1 got: 1
生产者 #1 put: 2
消费者 #1 got: 2
生产者 #1 put: 3
消费者 #1 got: 3
生产者 #1 put: 4
消费者 #1 got: 4
消费者 #1 got: 5
生产者 #1 put: 5
生产者 #1 put: 6
消费者 #1 got: 6
消费者 #1 got: 7
生产者 #1 put: 7
消费者 #1 got: 8
生产者 #1 put: 8
生产者 #1 put: 9
消费者 #1 got: 9

采用阻塞队列实现

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private volatile boolean FLAG = true;//默认开启,进行生产+消费。volatile为了让所有线程都可见
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;
    public MyResource(BlockingQueue<String> blockingQueue) //传接口,可扩展性强
    {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());//我要知道传过来的是什么
    }

    public void myProd() throws Exception{
        String data = null;
        boolean retValue;
        while(FLAG){
            data = atomicInteger.incrementAndGet()+"";
            retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);//2秒取一次
            if(retValue){
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t生产停止");
    }

    public void myConsumer() throws Exception{
        String result = null;
        while(FLAG){
            result = blockingQueue.poll(2L,TimeUnit.SECONDS);//2秒取不到返回null
            if(null==result || result.equalsIgnoreCase("")){
                FLAG = false;
                System.out.println(Thread.currentThread().getName()+"\t 超过2秒,消费退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
        }
    }

    public void stop() throws Exception{
        this.FLAG = false;
    }
}

/*
 * volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用
 * */

public class ProdConsumer_BlockQueueDemo {
    public static void main(String[] args) throws Exception{
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
            System.out.println();
            System.out.println();
            try{
                myResource.myProd();
            }catch (Exception e){
                e.printStackTrace();
            }
        },"Prod").start();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
            try{
                myResource.myConsumer();
            }catch (Exception e){
                e.printStackTrace();
            }
        },"Consumer").start();

        try{
            TimeUnit.SECONDS.sleep(5);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("5秒钟到,main停止");
        myResource.stop();
    }
}

运行结果:

java.util.concurrent.ArrayBlockingQueue
Prod	 生产线程启动


Prod	插入队列1成功
Consumer	 消费线程启动
Consumer	消费队列1成功
Prod	插入队列2成功
Consumer	消费队列2成功
Prod	插入队列3成功
Consumer	消费队列3成功
Prod	插入队列4成功
Consumer	消费队列4成功
Prod	插入队列5成功
Consumer	消费队列5成功



5秒钟到,main停止
Prod	生产停止
Consumer	 超过2秒,消费退出

版权声明:本文为QAQ123666原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。