Linux详解 --- 多线程3: 线程同步、生产者消费者模型

线程同步

问题:什么是同步?
在保证数据安全的提前下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,这称之为同步。
#数据安全是靠互斥实现
#按照特定顺序访问是靠同步实现

问题:为什么要存在同步?
让多线程能协同高效的完成某些事情。
//不存在的同步的话,可能会出现,多个线程在抢占一个资源时,有一个先占每次都能抢到,导致其它线程一直处于饥饿。

问题:如何实现同步?

  1. 线程拿锁进入,当条件不满足时,释放锁,并在某个条件变量下等待
  2. 当等待的资源就绪了,通知正在等待的线程。

条件变量

条件变量:用来描述某种临界资源是否就绪的一种数据化描述
#条件变量通常需要配合mutex互斥锁一起使用!

· 条件变量的初始化

//静态初始化 (出了作用域自动释放)
pthread_cond_t t = PTHREAD_COND_INITIALIZER;

//动态初始化 (需要destroy)
int pthread_cond_init(pthread_cond_t* restrict cond, 
const pthread_condattr_t* restrict attr);

pthread_cond_init参数:
cond:要初始化的条件变量
attr:条件变量的属性,一般设为NULL

· 条件变量的销毁

int pthread_cond_destroy(pthread_cond_t* cond)

· 等待条件满足

int pthread_cond_wait(pthread_cond_t* restrict cond, 
pthread_mutex_t* restrict mutex);

参数:
cond:在这个条件变量下等待
mutex:当前持有的互斥锁
为什么pthread_cond_wait接口需要互斥锁?

· 唤醒等待

int pthread_cond_broadcast(pthread_cond_t* cond);//broadcast是唤醒等待队列的所有线程
int pthread_cond_signal(pthread_cond_t* cond);  //signal是唤醒等待队列的第一个线程

演示

  //创建2个线程,让一个线程控制另一个线程去活动Run
  #include <iostream>    
  #include <unistd.h>    
  #include <pthread.h>    
  using namespace std;    
      
  pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;    
  pthread_cond_t cond = PTHREAD_COND_INITIALIZER;    
      
  void* Run(void* arg)    
  {    
    while(1)    
    {    
      pthread_cond_wait(&cond, &lock);    
      cout << "Running ~~~~~" << endl;    
    }    
  }    
      
  void* Control(void* arg)    
  {    
    while(1)    
    {    
      sleep(1);                                                                                                                                                                                                
      pthread_cond_signal(&cond);    
      cout << "Notify Run Thread!" << endl;    
    }    
  }    
      
  int main()    
  {    
    pthread_t tid1, tid2;    
      
    pthread_create(&tid1, NULL, Control, NULL);    
    pthread_create(&tid2, NULL, Run, NULL);    
      
    pthread_join(tid1, NULL);    
    pthread_join(tid2, NULL);    
    return 0;    
  } 

在这里插入图片描述

问题:为什么pthread_cond_wait需要互斥量参数?
我们可以先模拟出下面这个场景:
我们现在有2个线程,线程A和线程B。线程A因为某个条件不满足,而在cond条件下进行等待。我们需要线程B去唤醒线程A,而要去唤醒线程A,那就肯定要去改变让A不满足的条件,而改变这个条件肯定需要线程B进入到临界区访问临界资源!
线程A在进行cond等待之前,也是先加锁之后进入的,因为要进行条件的判断需要访问到临界资源。A在判断条件不满足后,就开始在cond下等待了。而线程B现在想要去改变这个条件,也需要申请到锁!!!
如果线程A在cond下等待的时候不释放锁,那么线程B会因为申请不到锁而被阻塞挂起,此时就会造成“死锁”。
解决:
为了不造成死锁,线程A在cond下等待时,必须先释放占有的锁。然后线程B再去访问临界资源,从而改变让A不满足的条件。然后在适当的时候去通知线程A,唤醒线程A!
唤醒线程A后,线程A会重新持有锁,再去访问临界资源…

· 惊群效应
惊群效应就是指多线程/多进程在同时阻塞等待一个事件的时候,如果等待的这个时间发生时引起所有线程/进程被唤醒,但是最终只能有一个线程/进程得到这个事件的控制权,而其它线程又只能去重新阻塞等待。这种情况就叫做惊群效应。
举个?:
当你往一群鸽子中间扔一块食物,虽然最终只有一个鸽子抢到食物,但所有鸽子都会被惊动来争夺,没有抢到食物的鸽子只好回去继续睡觉, 等待下一块食物到来。像这样的,每扔一块食物,就会惊动所有的鸽子,即为惊群。
刚才所介绍的pthread_cond_broadcast接口就是能够引起惊群效应的接口,我们应该注意这个接口的使用。

生产者消费者模型

生产着消费者模式就是借用一个容器去解决生产者和消费者之间强耦合的问题,通过这个容器,生产者不需要再和消费者直接进行通信,生产者只需要生产数据并把数据放到容器当中,消费者只需要从容器中领取数据并消化。

· 生产者消费者模型的优点
①:在代码层面实现解耦,提高代码的可维护性
②:支持并发
③:支持忙闲不均

· 深入理解生产者消费者模型
3种关系:生VS生(互斥)、生VS消(互斥+同步)、消VS消(互斥)
2种角色:生产者、消费者 (指特定的线程/进程)
1种交易场所:通常是内存中的一段缓冲区(自己通过某种方式组织起来的)

在这里插入图片描述

基于阻塞队列的生产者消费者模型

思路:
首先,我们的成员变量有:一个queue作为存储数据的结构;阻塞队列的大小cap,它用于与队列相比较看是否达到我们设置的阻塞队列大小cap。此外,我们还需要1个mutex锁,锁住向队列中进行操作那部分临界区。当queue满的时候,需要让生产者线程等待,需要一个条件变量full;当queue空的时候,需要让消费者线程等待,需要一个条件变量empty。

/***	BlockQueue.h	***/
#pragma once
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
#include <queue>
    
template <class T>
class BlockQueue
{
  public:
    BlockQueue(size_t num = 10)
      :cap(num)
    {
      pthread_mutex_init(&lock, NULL);
      pthread_cond_init(&full, NULL);
      pthread_cond_init(&empty, NULL);
    }
     
    void Put(T& data)
    {
      pthread_mutex_lock(&lock);
      while(q.size() >= cap)	//开始访问临界资源,在这之前要上锁
      {
        pthread_cond_wait(&full, &lock);
      }
      //设置水位线,当队列长度查过一半时就开始通知消费者去消费 
      if(q.size() >= cap / 2)
        pthread_cond_signal(&empty);
      q.push(data);
      pthread_mutex_unlock(&lock);
      std::cout << "Producer Push a data!" << std::endl;
    }
    
    void Get(T& data)
    {
      pthread_mutex_lock(&lock);
      while(q.empty())	//开始访问临界资源,在这之前要上锁
      {
        pthread_cond_wait(&empty, &lock);
      }
	  //设置水位线,当队列长度小于等于1/4是就通知生产者生产	
      if(q.size() <= cap / 4)
        pthread_cond_signal(&full);

      data = q.front();
      q.pop();
      pthread_mutex_unlock(&lock);
      std::cout << "Consumer Get a data~~~" << std::endl;
    }

    ~BlockQueue() 
    {
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&full);
      pthread_cond_destroy(&empty);
    }
  private:
    std::queue<T> q;
    size_t cap;		//队列最大容量
    pthread_mutex_t lock;	//互斥锁
    pthread_cond_t full;	//当队列满的时候,让生产者在full条件下等待
    pthread_cond_t empty;	//当队列空的时候,让消费者在empty条件下等待
};         
/***	main.cpp	***/
//这里就设定队列中存放一些整形数据,生产者通过rand函数生产数据,消费者去获取数据
#include "BlockQueue.h"
#include <time.h>
#define NUM 10
    
void* Producer(void* arg)    
{    
  BlockQueue<int>* bq = (BlockQueue<int>*)arg;    
  while(1)    
  {    
    int data = rand() % 100;
    bq->Put(data);
    sleep(1);	//自己给定,这里为了易于观察,让消费者sleep(1)
  }
}
 
void* Consumer(void* arg)    
{    
  BlockQueue<int>* bq = (BlockQueue<int>*)arg;    
  while(1)    
  {    
    usleep(10000);
    int data;
    bq->Get(data);
    std::cout << "Get a data :" << data << std::endl;
  }
}    
    
int main()
{
  BlockQueue<int>* bq = new BlockQueue<int>(NUM);
  pthread_t tid1, tid2;
  srand(time(NULL));
  
  pthread_create(&tid1, NULL, Producer, (void*)bq);
  pthread_create(&tid2, NULL, Consumer, (void*)bq);

  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  return 0;
}
#Makefile
main:main.cpp    
  g++ -o $@ $^ -lpthread    
    
.PHONY:clean    
clean:    
  rm -rf main  

在这里插入图片描述


版权声明:本文为weixin_51696091原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。