第2章 线程同步精要
线程同步四项原则,依照重要性排序:
- 尽量不要共享对象,共享对象优先选择不可修改的对象
- 使用高级并发编程构件
- 使用低级同步原语,只用非递归(不可重入)的互斥量和条件变量,不用读写锁、信号量
- 除原子级整数外,不编写无锁(lock-free)的代码,不用内核级同步原语
互斥量(mutex)
作者陈硕建议的互斥量使用原则:
- 遵从RAII原则进行mutex的创建、销毁、加锁、解锁
- 使用非递归(不可重入)的mutex
- 不要手动调用lock()和unlock(),全部交给栈上的Guard对象,使得加锁期和临界区一样长
- 注意Guard加锁的顺序,防止死锁
次要原则:
- 不使用跨进程的mutex,进程之间只用tcp通信
- 加锁解锁只在同一线程(Guard可以保证)
- 可以用PTHREAD_MUTEX_ERRORCHECK进行排错
只用非递归的mutex
递归的mutex指的是,同一个线程可以对mutex重复加锁。
思考以下代码对vector的共享
void post(const Foo& f)
{
lock_guard lg(mutex_);
vec.push_back(f);
}
void traverse()
{
lock_guard lg(mutex_);
for (auto it = vec.begin(); it != vec.end(); ++it)
{
it->doSomeThing();
}
}
如果mutex是可重入的,两个函数可以同时执行,push_back可能导致迭代器失效;如果是不可重入的,如果doSomeThing调用了post。就会发生死锁。
一个功能函数可能分为加锁和不加锁版本,由此可能引起两个错误:
- 误用加锁版本,导致死锁;
- 误用不加锁版本,没有保护到数据。
对于++错误2++,可以在函数开头加入断言,保证当前线程没有被mutex加锁。
//该函数由muduo::MutexLock提供
assert(mutex_.isLockedByThisThread());
++错误1++单独讨论
死锁
死锁实例
class Request;
class Inventory
{
public:
void add(Request* req)
{
lock_guard<mutex> lock(mutex_);
requests_.insert(req);
}
void remove(Request* req)// __attribute__ ((noinline))
{
lock_guard<mutex> lock(mutex_);
requests_.erase(req);
}
void printAll() const;
private:
mutable mutex mutex_;
std::set<Request*> requests_;
};
Inventory g_inventory;
class Request
{
public:
void process() // __attribute__ ((noinline))
{
lock_guard<mutex> lock(mutex_);
g_inventory.add(this);
// ...
}
~Request()// __attribute__ ((noinline))
{
lock_guard<mutex> lock(mutex_);
this_thread::sleep_for(chrono::milliseconds(1000));
g_inventory.remove(this);
}
void print()// const __attribute__ ((noinline))
{
lock_guard<mutex> lock(mutex_);
// ...
}
private:
mutable mutex mutex_;
};
void Inventory::printAll() const
{
lock_guard<mutex> lock(mutex_);
this_thread::sleep_for(chrono::milliseconds(1000));
for (std::set<Request*>::const_iterator it = requests_.begin();
it != requests_.end();
++it)
{
(*it)->print();
}
printf("Inventory::printAll() unlocked\n");
}
void threadFunc()
{
Request* req = new Request;
req->process();
delete req;
}
int main()
{
thread thread(threadFunc);
this_thread::sleep_for(chrono::milliseconds(500));
g_inventory.printAll();
thread.join();
}
上述代码中,Inventory 的 printAll() 相比 Request 的 ~Requset() 晚启动0.5s,两者均在1s后调用函数,申请自己的mutex,造成环形等待。
条件变量
mutex只是用来加锁,条件变量用于等待某个条件达成(布尔表达式为真),学名管程(monitor)。条件变量用法只有一种。
对于wait端:
- 必须使用mutex保护布尔表达式。
- mutex上锁之后调用wait()。
- 布尔判断和*wait()*放入while中。
mutex mutex_;
deque<int> dq;
condition_variable cv;
int number = 0;
void consumer()
{
unique_lock<mutex> lock(mutex_);
cv.wait(lock, [] {return !dq.empty(); });
cout << dq.front() << endl;
dq.pop_front();
}
注意:
cv.wait(lock, [] {return !dq.empty(); });
会解锁mutex并阻塞当前线程,直到lambda返回true,才会终止阻塞并重新加锁mutex。等同于使用了while表达式。
unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用。
对于signal/broadcast端:
- 理论上,signal不一定需要上锁。
- 一般来说,signal之前要修改布尔表达式(使条件为真)。
- 修改布尔表达式需要mutex保护。
- signal和broadcast不同,前者唤醒一个线程,资源直接可用,后者唤醒所有线程,只能表示状态发生变化,因此要使用while不停判断条件,防止虚假唤醒。
void productor()
{
lock_guard<mutex> lock(mutex_);
dq.push_back(number++);
cv.notify_all();
}
条件变量时底层的同步原语,很少直接使用,一般被用于实现高层的同步措施。
倒计时是常用的同步手段,有两个用途:
- 主线程发起多个子线程,等到每个线程完成一定任务后,主线程继续执行。
- 主线程发起多个子线程,主线程完成一定任务后,子线程开始执行。
class CountDownLatch
{
public:
explicit CountDownLatch(int count) : cnt_(count) {};
void wait()
{
unique_lock<mutex> lock(mutex_);
cv.wait(lock, [=] {return cnt_ == 0; });
}
void countDown()
{
lock_guard<mutex> lock(mutex_);
cnt_--;
if (cnt_ == 0)
{
cv.notify_all();
}
}
private:
mutable mutex mutex_;
condition_variable cv;
int cnt_;
};
不要用读写锁和信号量
初学者遇到读很多,写很少的场景,就把 mutex 替换为 rwlock ,不见得正确:
- 在 read lock 中错误的对共享数据进行了写操作,常见于新增功能的时候。
- 如果临界区很小,锁竞争不大,mutex 可能比 rwlock 更高效,因为 rwlock 要更新reader数目。
- 有些 read lock 允许提升为 write lock 会引入一系列问题,比如数据损坏或者死锁。
- 有些 write lock 会阻塞 read lock,增加读取延迟。
信号量(Semaphore)可以被条件变量替代,作者没有使用经验。
线程安全的单例模式实现
template<typename T>
class Singleton : boost::noncopyable
{
public:
static T& instance()
{
pthread_once(&ponce_, &Singleton::init);
return *value_;
}
private:
Singleton();
~Singleton();
static void init()
{
value_ = new T();
::atexit(destroy);
}
static void destroy()
{
typedef char T_must_be_complete_type[sizeof(T) == 0 ? -1 : 1];
delete value_;
}
private:
static pthread_once_t ponce_;
static T* value_;
};
template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;
template<typename T>
T* Singleton<T>::value_ = NULL;
上述代码中为了实现在程序结束时的销毁功能,使用了atexit,该函数的解释如下:
函数名: atexit
功 能: 注册终止函数(即main执行结束后调用的函数)
用 法: int atexit(void (*func)(void));
注意:atexit()注册的函数类型应为不接受任何参数的void函数,exit调用这些注册函数的顺序与它们登记时候的顺序相反。
作者:Quinn0918
原文:https://blog.csdn.net/Quinn0918/article/details/70457370
pthread_once(&ponce_, &Singleton::init);
可以保证注册的init函数,无论多线程怎么调用pthread_once,init只被调用一次。
typedef char T_must_be_complete_type[sizeof(T) == 0 ? -1 : 1];
这句代码旨在编译期检查T类型是否是不完整类型(大小为0).其本质是利用typedef定义一个char数组别名,如果T类型是完整类型,char[1]数组别名定义为T_must_be_complete_type,否则编译器报错( char[-1] )。见github讨论:
使用shared_ptr实现copy-on-write
前文post和traverse死锁问题,解决思路:
- 读没有锁,写必须在临界区,避免重复加锁。
- 写的时候如何不阻塞读?使用shared_ptr观测是否被占用,如果占用就拷贝一份写进去。
- 读端如何表示占用?强行增加shared_ptr的引用计数。
mutex mutex_;
shared_ptr<vector<Foo>> g_foos;
void traverse()
{
shared_ptr<vector<Foo>> foos;
{
lock_guard<mutex> lock(mutex_);
foos = g_foos;//增加引用计数
assert(!g_foos.unique());//确保增加成功
}
for (auto it = foos->begin(); it != foos->end(); ++it)
{
it->doSomeThing();
}
}
void post(const Foo& f)
{
lock_guard<mutex> lock(mutex_);
if (!g_foos.unique())
{
g_foos.reset(new vector<Foo>(*g_foos));//正在读取,只能copy一份写进去
}
assert(g_foos.unique());//确保独占
g_foos->push_back(f);
}
前文Request和Inventory的死锁解决思路:
- 把print移除到printAll的临界区以外。
- printAll作为读端,增加引用计数表示占用。
- add、remove作为写端,检测到占用时,复制一份再写。
class Request;
class Inventory
{
public:
Inventory()
: requests_(new RequestList)
{
}
void add(Request* req)
{
muduo::MutexLockGuard lock(mutex_);//所有些过程,都在临界区之内
if (!requests_.unique())//检测到占用,复制一份
{
requests_.reset(new RequestList(*requests_));
printf("Inventory::add() copy the whole list\n");
}
assert(requests_.unique());
requests_->insert(req);
}
void remove(Request* req) // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
printf("Inventory::remove() copy the whole list\n");
}
assert(requests_.unique());
requests_->erase(req);
}
void printAll() const;
private:
typedef std::set<Request*> RequestList;
typedef boost::shared_ptr<RequestList> RequestListPtr;
RequestListPtr getData() const
{
muduo::MutexLockGuard lock(mutex_);
return requests_;
}
mutable muduo::MutexLock mutex_;
RequestListPtr requests_;
};
Inventory g_inventory;
class Request
{
public:
Request()
: x_(0)
{
}
~Request() __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
x_ = -1;
sleep(1);
g_inventory.remove(this);
}
void process() // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
g_inventory.add(this);
// ...
}
void print() const __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
printf("print Request %p x=%d\n", this, x_);
}
private:
mutable muduo::MutexLock mutex_;
int x_;
};
void Inventory::printAll() const
{
RequestListPtr requests = getData();//这一步使得引用计数加1
sleep(1);
for (std::set<Request*>::const_iterator it = requests->begin();
it != requests->end();
++it)
{
(*it)->print();//临界区之外调用
}
}
void threadFunc()
{
Request* req = new Request;
req->process();
delete req;
}
int main()
{
muduo::Thread thread(threadFunc);
thread.start();
usleep(500*1000);
g_inventory.printAll();
thread.join();
}