c++11 多线程 条件变量(condition_variable)

前言

在C++11中,对于多线程的同步问题,我们可以使用条件变量condition_variable;当条件不满足时,相关线程被一直阻塞,直到某种条件出现,该线程才会被唤醒condition_variable类的成员函数如下:

一、条件变量condition_variable

条件变量是利用线程间共享全局变量进行同步的一种机制,主要包括:

  1. 一个线程因等待条件变量的条件成立”而挂起;
  2. 其他线程使“条件成立”,给出信号,从而唤醒被等待的线程。

1、应用场景

当需要死循环判断某个条件成立与否时【true or false】,如果多开一个线程死循环来判断,这样非常消耗CPU。使用条件变量,可以让当前线程wait,释放CPU,如果条件改变时,我们再notify退出线程,再次进行判断,这样就省去了CPU轮询空耗资源

2、具体动作

即使共享变量是原子性的,它也必须在mutex的保护下被修改,这是为了能够将改动正确发布到正在等待的线程。

任意要等待std::condition_variable的线程必须:

  1. 获取std::unique_lock<std::mutex>,这个mutex正是用来保护共享变量(即“条件”)的
  2. 执行wait, wait_for或者wait_until. 这些等待动作原子性地释放mutex,并使得线程的执行暂停
  3. 当获得条件变量的通知,或者超时,或者一个虚假的唤醒,那么线程就会被唤醒,并且获得mutex. 然后线程应该检查条件是否成立,如果是虚假唤醒,就继续等待。

【注: 所谓虚假唤醒,就是因为某种未知的罕见的原因,线程被从等待状态唤醒了,但其实共享变量(即条件)并未变为true。因此此时应继续等待】

二、具体函数说明

1、wait

1、wait(unique_lock <mutex>&lck)
当前线程的执行会被阻塞,直到收到 notify 为止。
2、wait(unique_lock <mutex>&lck,Predicate pred) 重载版本
当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞

wait() 可依次拆分为三个操作:释放互斥锁等待在条件变量上,暂停线程、再次获取互斥锁

2、notify_one:

notify_one():没有参数、没有返回值。

解除阻塞当前正在等待此条件的线程之一。如果没有线程在等待,则还函数不执行任何操作。如果超过一个,则选择任意一个等待线程唤醒。

3、wait_for

  1. cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time)
  2. bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred) 重载版本

与 wait() 类似,不过 wait_for 可以指定一个时间段,在当前线程收到通知或者指定的时间 rel_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_for 返回,剩下的处理步骤和 wait() 类似。

4、wait_until

与 wait_for 类似,但是 wait_until 可以指定一个时间点,在当前线程收到通知或者指定的时间点 abs_time 之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_until 返回,剩下的处理步骤和 wait_until() 类似

三、消费者和生产者模型

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;

void function_1() //生产者
{
    int count = 10;
    while (count > 0) 
    {
        std::unique_lock<std::mutex> locker(mu);
        q.push_front(count);
        locker.unlock(); //手动释放锁,防止唤醒后别的线程,还阻塞在锁上
        cond.notify_one();  // 通知一个消费者消费,.
        std::this_thread::sleep_for(std::chrono::seconds(1));
        count--;
    }
}

void function_2() //消费者
{
    int data = 0;
    while (data != 1) 
    {
        std::unique_lock<std::mutex> locker(mu);  上锁
        while (q.empty())
            cond.wait(locker); // Unlock mutex 和 wait 阻塞,释放cpu,最后醒来lock
        data = q.back();
        q.pop_back();
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}
int main() 
{
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

有边界的 生产者和消费者模型

#include<bits/stdc++.h>
#include <unistd.h>

using namespace std;


queue<int> q;
mutex mtx;
condition_variable not_empty,not_full;

int max_size =20;

void producer(int id){
    while(true){
        sleep(1);
        unique_lock<mutex> lck(mtx);
        while(q.size()==max_size){
            not_full.wait(lck);
        }
        cout << "-> producer " << this_thread::get_id() << ": ";
        q.push(id);
        cout << "q size "<<q.size() << '\n';
        lck.unlock();
        not_empty.notify_all();
    }
}

void consumer(){
    while(true){
        sleep(1);
        unique_lock<mutex> lck(mtx);
        while(q.size()==0){
            not_empty.wait(lck);   // Unlock mutex 和 wait 阻塞,释放cpu,最后醒来lock
        }
        cout << "consumer " << this_thread::get_id() << ": ";
        q.pop();
        cout << "q size "<<q.size() << '\n';
        lck.unlock();
        not_full.notify_all();
    }
}

int main(){
    thread consumers[2],produces[2];
    for(int i=0;i<2;++i){
        consumers[i]=thread(consumer);
        produces[i] = thread(producer,i+1);
    }
    for(int i=0;i<2;++i){
        consumers[i].join();
        produces[i].join();
    }
    return 0;
}




版权声明:本文为h2517956473原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。