我们知道关联性map有多种实现方式,一般有顺序表和二叉树以及哈希表;
如果我们要设计一个线程安全的map,且要求并发性要高,那么我们可以对上面的实现方法做一个评价;
1:二叉树,稍微想一下就会不合适,拿AVL树来讲
:效率底下,每次访问树都需要经过根节点,这说明我们要拿一个互斥量来保护根节点,这很不好,因为访问根节点的次数太多了;
:容易死锁,当有多个线程进行着向下遍历树和向上遍历数,因为每次遍历可能需要获取邻近节点的锁,所以遍历方向的不同会导致死锁的发声
【这些毛病是树结构通有的,因为树型数据结构每个数据的之间的依赖性太强了,这说明树型结构可能不适合多线程并发】
2:顺序表,这个数据结构有着比二叉树更操蛋的问题
:会导致死锁,和二叉树一样的理由
:效率底下,不是因为互斥量的原因,而是因为数据结构本身的原因,一次查询就需要o(n)的时间,因为这个,即使在单线程中,也几乎没有人会拿顺序表作为map的实现
3:哈希表,这个数据结构就非常适合并发了,而且是高并发!
:对于每一个桶之间都是一个独立的个体,之间没有依赖关系,这说明我们可以同时访问两个桶,更普遍地说,如果情况是理想的,我们可以同时访问所有桶!只需要给每个桶一个互斥量,以保证对单独的一个桶的访问是串行的【我们也可以用boost::shared_lock来做到对一个桶多并发读访问,但是修改访问是串行的】
综上所述,我选择哈希表作为map的实现:
看下列代码:
#include<memory>
#include<iterator>
#include<thread>
#include<mutex>
#include<vector>
#include<list>
#include<atomic>
#include<algorithm>
using namespace std;
//类拷贝方案错误
class class_construct_error :public exception {};
template<typename key,typename value,typename hash=std::hash<key>>
class hash_map {
private:
class bucket {
private:
typedef pair<key, value> date;
typedef typename std::list<date>::iterator date_iterator;
typedef std::list<date> date_store;
//拷贝策略接口:
struct strategy_construct {
virtual void operator()(date_iterator it, const value& val) = 0;
};
//两种策略拷贝策略
//移动拷贝策略
struct move_construct :public strategy_construct {
void operator()(date_iterator it, const value& val) {
it->second = std::move(val);
}
};
//赋值拷贝策略
struct copy_construct :public strategy_construct {
void operator()(date_iterator it, const value& val) {
it->second = val;
}
};
date_store dates;
std::mutex mtx;
//寻找迭代器
date_iterator find_for_iterator(const key& k) {
return find_if(dates.begin(), dates.end(),
[&](const date& it)
{return k == it.first; }
);
}
public:
strategy_construct *strategy;
bucket() {
if (std::is_move_constructible<value>::value) {
strategy = new move_construct();
return;
}
else if (std::is_copy_constructible<value>::value) {
strategy = new copy_construct();
return;
}
//如果类没有合适的拷贝构造函数,那么就直接中止程序
}
//修改、添加值
//如果添加,返回true,否则返回false
bool add_or_update(const key& k, const value& val) {
lock_guard<mutex> lock_g(mtx);
date_iterator it = find_for_iterator(k);
if (it == dates.end()) {//没找到,则添加值
dates.push_back(date(k, val));
return true;
}
else {
(*strategy)(it, val);//要求value类要有拷贝构造函数
return false;
}
}
/* 删除值
* return 如果值存在并删除,那么return true 否则 return false
*/
bool remove(const key& k) {
lock_guard<mutex> lock_g(mtx);
date_iterator it = find_for_iterator(k);
if (it != dates.end()) {
dates.erase(it);
return true;
}
return false;
}
/*寻找k对应的值
return 如果找到,则return对应的value,否则return 默认val
*/
value find_value(const key& k, const value& default_val) {
lock_guard<mutex> lock_g(mtx);
date_iterator it = find_for_iterator(k);
if (it != dates.end()) {
return it->second;
}
return default_val;
}
size_t size() {
return dates.size();
}
//这三个操作是为扩充哈希表而实现的,在这段时间内任何对哈希表的请求都会被阻塞,所以不需要锁保护
void push(date& new_date) {
this->dates.push_back(std::move(new_date));
}
date_iterator begin() {
return dates.begin();
}
date_iterator end() {
return dates.end();
}
};
std::vector<std::unique_ptr<bucket>> buckets;//桶管理器
hash hasher;//哈希器
size_t cnt;//元素个数
atomic<bool> flag{ 1 };//是否将刚调用哈希表的线程阻塞
atomic<size_t> consumer;//当前时刻访问的哈希表的线程量
condition_variable cond_check;//让check函数等待
condition_variable cond_enter;//让正在扩充哈希表的这段时间内访问哈希表的线程等待
std::mutex mtx_check, mtx_enter;
bucket& find_bucket(const key& k) {
std::size_t const bucket_index = hasher(k) % buckets.size();
return *buckets[bucket_index];
}
void reorder() {
size_t old_size = buckets.size();
size_t index;
std::vector<std::unique_ptr<bucket>> new_bucket(old_size * 10);
for (auto& t : new_bucket) {
t = std::make_unique<bucket>();
}
for (auto& t : buckets) {
for (auto it = t.get()->begin(); it != t.get()->end(); it++) {
index = hasher(it->first) % new_bucket.size();
new_bucket[index].get()->push(*it);
}
}
buckets = std::move(new_bucket);
//哈希表扩充完毕,唤醒等待的线程;
flag = 1;
cond_enter.notify_all();
}
void check() {
if (double(cnt) / buckets.size() > 16) {
if (consumer > 0) {
flag=false;//开始扩充哈希表,阻止线程访问
unique_lock<std::mutex> lock_u(mtx_check);
cond_check.wait(lock_u);//等待所有线程从哈希表中退出
}
flag = false;
reorder();
}
}
void enter() {
if (flag == false) {
unique_lock<std::mutex> lock_u(mtx_enter);
cond_enter.wait(lock_u);
lock_u.unlock();//记得要unlock,要不然这些线程会串行
}
consumer++;
}
void exit() {
consumer--;
if (flag == 0 && consumer == 0) {
//此刻哈希表的访问线程为0,哈希表正在等待扩充
cond_check.notify_all();//开始扩充
}
}
public:
hash_map(unsigned int size = 19)
:buckets(size) {
for (unsigned int i = 0; i < size; i++) {
buckets[i].reset(new bucket());
}
}
hash_map(const hash_map&) = delete;
value find(const key& k, const value& default_val = value()) {
enter();
return this->exit(),find_bucket(k).find_value(k, default_val);
}
//如果添加,返回true,否则返回false
bool modify_or_add(const key& k,const value& val) {
enter();
if (find_bucket(k).add_or_update(k, val)) {
this->exit();
cnt++; check();
return true;
}
else {
this->exit();
return false;
}
}
bool remove(const key& k) {
enter();
if (find_bucket(k).remove(k)) {
cnt--;
this->exit();
return true;
}
else {
this->exit();
return false;
}
}
size_t size() { return cnt; }
size_t bucket_size() { return buckets.size(); }
};这边的思路是用一个互斥量来保护一个桶,这里没什么好说的;
但是要注意,当我们的哈希表扩张时,要保证此刻哈希表没有线程进行访问,且要保证在进行扩展的这段时间,线程能够进行访问的,为此我利用了两个环境变量和两个原子对象来控制这个流程,下面简单描述一下逻辑;
atmoic<size_t> consumer;此刻访问哈希表的线程数
atomic<bool> flag;是否允许线程对哈希表进行访问,当开始扩充时,将其设置为false
1:当一个线程开始访问哈希表时,先进入enter函数,判断是否能进行访问,如果不能则线程进入阻塞状态;
2:当一个线程结束访问哈希表是,进入exit函数,判断是否能哈希表是否能进行访问,如果不能,则说明哈希表正在等待扩张,这再判断此刻是否已经没有线程对哈希表进行访问,如果是的话,则唤醒哈希表,进行扩张;
3:当要开始扩张哈希表的时候,见flag置为false,并进入阻塞状态,等待线程将其唤醒;
4:当扩张完毕时,将flag设置为true,以开放哈希表,然后唤醒在扩展时进入阻塞状态的线程;