生产者与消费者模型
一个场所,两种模型,三种关系
场所:线程安全的队列
模型:生产者、消费者
关系:生产者与生产者之间(互斥)、生产者与消费者(同步与互斥)、消费者与消费者(互斥)的关系
功能:解耦合、支持忙闲不均、支持并发
耦合度:相互之间的依赖关系越紧密,耦合度越高
使用条件变量实现生产者消费者模型
// 使用条件变量生产者与消费者模型
#include <iostream>
#include <queue>
#include <pthread.h>
#define MAX_QUEUE 10
#define MAX_SIZE 4
class BlockQueue {
public:
BlockQueue(int cap = MAX_QUEUE)
: _capacity(cap)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_producter, NULL);
pthread_cond_init(&_cond_consumer, NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_producter);
pthread_cond_destroy(&_cond_consumer);
}
private:
std::queue<int> _queue;
int _capacity; // 队列节点最大数量
pthread_mutex_t _mutex;
pthread_cond_t _cond_producter;
pthread_cond_t _cond_consumer;
public:
void QueuePush(int data) // 线程安全的入队操作
{
QueueLock();
while (QueueIsFull()) {
ProducterWait();
}
_queue.push(data);
ConsumerWakeUp();
QueueUnlock();
}
void QueuePop(int* data) // 线程安全的出队操作adddic:
{
QueueLock();
while (QueueIsEmpty()) {
ConsumerWait();
}
*data = _queue.front();
_queue.pop();
ProducterWakeUp();
QueueUnlock();
}
private:
// 辅助函数
void QueueLock() {
pthread_mutex_lock(&_mutex);
}
void QueueUnlock() {
pthread_mutex_unlock(&_mutex);
}
void ConsumerWakeUp() {
pthread_cond_signal(&_cond_consumer);
}
void ConsumerWait() {
pthread_cond_wait(&_cond_consumer, &_mutex);
}
void ProducterWakeUp() {
pthread_cond_signal(&_cond_producter);
}
void ProducterWait() {
pthread_cond_wait(&_cond_producter, &_mutex);
}
bool QueueIsFull() {
return (_capacity == _queue.size());
}
bool QueueIsEmpty() {
return _queue.empty();
}
};
int i = 0;
pthread_mutex_t mutex; // 保护生产产品量
void* producter_route(void* arg) {
BlockQueue *q = (BlockQueue*)arg;
while (1) {
pthread_mutex_lock(&mutex);
q->QueuePush(i++);
std::cout << "producter put data: " << i << std::endl;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void* consumer_route(void* arg) {
BlockQueue *q = (BlockQueue*)arg;
while (1) {
int data;
q->QueuePop(&data);
std::cout << "consumer get data:" << data << std::endl;
}
return NULL;
}
int main() {
BlockQueue q;
pthread_t consumer_tid[MAX_SIZE], producted_tid[MAX_SIZE];
pthread_mutex_init(&mutex, NULL);
for (int i = 0; i < MAX_SIZE; ++i) {
int ret = pthread_create(&producted_tid[i], NULL, producter_route, (void*)&q);
if (ret != 0) {
std::cout << "pthread create error\n";
}
ret = pthread_create(&consumer_tid[i], NULL, consumer_route, (void*)& q);
if (ret != 0) {
std::cout << "pthread create error\n";
}
}
for (int i = 0; i < MAX_SIZE; ++i) {
pthread_join(producted_tid[i], NULL);
pthread_join(consumer_tid[i], NULL);
}
return 0;
}
信号量:计数器 + 等待队列 + 等待与唤醒功能
通过自身计数器,实现条件判断:
当条件满足时则可以获取数据直接返回,计数减一;
条件不满足时,阻塞
当产生资源后,通过信号的唤醒功能唤醒等待并且计数加一
sem_t // 定义
sem_init // 初始化
sem_wait // 条件判断是否阻塞,计数-1
sem_post // 计数加1,唤醒阻塞
sem_destroy // 销毁
信号量初始化
int sem_init(sem_t* sem, int pshared, unsigned int value);
sem “信号量”
pshared “0 用于线程间同步与互斥 !0 用于进程间同步与互斥”
value “信号量初值”
信号量的使用
// 进行条件判断
// 如果计数 <= 0 ,阻塞;否则立即返回,计数减一
int sem_wait(sem_t* sem);
// 计数加一,唤醒阻塞
int sem_post(sem_t* sem);
信号量的销毁
int sem_destroy(sem_t* sem);
使用信号量实现生产者与消费者模型
// 使用信号量实现生产者消费者模型
#include <iostream>
#include <queue>
#include <pthread.h>
#include <semaphore.h>
#define MAX_QUEUE 10
#define MAX_SIZE 4
class RingQueue{
private:
std::vector<int> _queue;
int _capacity;
int _write; // 写指针
int _read; // 读指针
sem_t _lock; // 互斥信号量
sem_t _data; // 数据资源信号量
sem_t _space; // 空闲空间信号量
public:
RingQueue (int cap = MAX_QUEUE)
: _capacity(cap)
, _queue(cap)
, _write(0)
, _read(0)
{
sem_init(&_lock, 0, 1);
sem_init(&_space, 0, cap);
sem_init(&_data, 0, 0);
}
~RingQueue ()
{
sem_destroy(&_lock);
sem_destroy(&_space);
sem_destroy(&_data);
}
private:
// 辅助函数
void QueueLock() {
sem_wait(&_lock);
}
void QueueUnlock() {
sem_post(&_lock);
}
void ProducterWait() {
sem_wait(&_space);
}
void ProducterPost() {
sem_post(&_space);
}
void ConsumerWait() {
sem_wait(&_data);
}
void ConsumerPost() {
sem_post(&_data);
}
public:
void QueuePush(int data) {
ProducterWait(); // 判断是否有空闲空间,有则立即返回,没有则等待
QueueLock();
_queue[_write] = data;
_write = (_write + 1) % _capacity;
QueueUnlock();
ConsumerPost(); // 唤醒消费者
}
void QueuePop(int* data) {
ConsumerWait(); // 判断是否有数据资源,有则立即返回,没有则等待
QueueLock();
*data = _queue[_read];
_read = (_read + 1) % _capacity;
QueueUnlock();
ProducterPost(); // 唤醒生产者
}
};
void* producter_route(void* arg){
RingQueue *q = (RingQueue*)arg;
int i = 0;
while (1) {
q->QueuePush(i);
std::cout << pthread_self()<< "\t" <<"put data: " << i++ << std::endl;
}
return NULL;
}
void* consumer_route(void* arg){
RingQueue *q = (RingQueue*)arg;
while (1) {
int data;
q->QueuePop(&data);
std::cout << pthread_self()<< "get data:" << data << std::endl;
}
return NULL;
}
int main(){
RingQueue q;
pthread_t producter_tid[MAX_SIZE], consumer_tid[MAX_SIZE];
for (int i = 0; i < MAX_SIZE; ++i) {
int ret = pthread_create(&producter_tid[i], NULL, producter_route, (void*)&q);
if (ret != 0) {
std::cout << "create error\n";
return -1;
}
}
for (int i = 0; i < MAX_SIZE; ++i) {
int ret = pthread_create(&consumer_tid[i], NULL, consumer_route, (void*)&q);
if (ret != 0) {
std::cout << "create error\n";
return -1;
}
}
for (int i = 0; i < MAX_SIZE; ++i) {
pthread_join(producter_tid[i], NULL);
pthread_join(consumer_tid[i], NULL);
}
return 0;
}
版权声明:本文为Q_feifeiyu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。