【设计并发代码】
8.1 在线程间划分工作的技术
8.1.1 处理开始前在线程间划分数据
划分数据最简单的方法就是将第一之组N个元素分配给一个线程,将下一组N个元素分配给另一个线程,以此类推,但是也可以使用别的模式。无论如何划分数据,每个线程只能处理分配给它的元素,并且直到它完成任务的时候才能与别的线程通信。这种结构与使用消息传递接口(Message Passing Intrerface, MPI)或者OpenMP框架编程的结构是类似的。
8.1.2 递归地划分数据
快速排序算法有两个基本步骤,基于其中一个元素(关键值)将数据划分为两部分,一部分在关键值之前,一部分在关键值之后,然后递归地排序这两部分。我们无法通过预先划分数据来实行并行,因为只有当处理元素的时候才知道这个元素是属于那一部分的,如果打算并行这个算法就需要把握递归的本质。每次递归的时候,会调用更多的quick_sort函数来排序关键点之前和关键点之后的元素。这些递归调用是完全独立的,因为它们读取完全不同的集合。
当对大规模的数据进行排序的时候,为每一个递归调用生成一个新线程会很快产生大量线程。在考虑性能的时候,就很快就会使线程数量达到上限。这里对任务进行划分只需要严格控制线程数量,我们只需要将此块存储到线程安全栈中,而不是递归调用创建一个薪线程。如果线程不在工作,就说明它已经处理完所有的块,或等待存储在栈中的块。此时可以从栈中得到一个块并将它排序。书里给出了使用待排序块的并行快速排序代码如下:
#include <iostream>
#include <algorithm>
#include <stack>
#include <list>
#include <vector>
#include <future>
#include <chrono>
#include "threadsafe_stack.h"
template<typename T>
class chunk_to_sort
{
public:
std::list<T> data;
std::promise<std::list<T> > promise;
chunk_to_sort() {}
chunk_to_sort(const chunk_to_sort &other)
{
data = other.data;
}
};
template<typename T>
class sorter
{
private:
threadsafe_stack<chunk_to_sort<T>> chunks;
std::vector<std::thread> threads;
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
public:
sorter() :
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false)
{}
~sorter()
{
end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i)
{
threads[i].join();
}
}
void try_sort_chunk()
{
if (chunks.empty())
return;
std::shared_ptr<chunk_to_sort<T> > chunk = chunks.pop();
if (chunk)
{
sort_chunk(chunk);
}
}
std::list<T> do_sort(std::list<T>& chunk_data)
{
if (chunk_data.empty())
{
return chunk_data;
}
std::list<T> result;
// 将chunk_data中第一个元素取出放入result
result.splice(result.begin(), chunk_data, chunk_data.begin());
// 将该元素值作为base值
T const& partition_val = *result.begin();
// 将chunk_data中剩余元素根据base值分区
typename std::list<T>::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) {return val < partition_val; });
// 小于base值的部分保存至new_lower_chunk,splice之后的chunk_data即为大于base值的部分
chunk_to_sort<T> new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data, chunk_data.begin(),
divide_point);
// 小于base部分的list压入全局类型栈
std::future<std::list<T> > new_lower =
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk));
// 线程数组添加线程 ??如果超过最大线程数会怎样??
if (threads.size() < max_thread_count)
{
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
}
// 大于base值的部分递归调用自身,继续进行分区排序
std::list<T> new_higher(do_sort(chunk_data));
// 将排好序的大于base部分的list拼接到result
result.splice(result.end(), new_higher);
// 小于base部分的list如果未完成则调用try_sort_chunk
while (new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready)
{
try_sort_chunk();
}
// 将排好序的小于base部分list拼接到result,形成完整排好序的list
result.splice(result.begin(), new_lower.get());
// 返回结果
return result;
}
void sort_chunk(std::shared_ptr<chunk_to_sort<T> > const& chunk)
{
chunk->promise.set_value(do_sort(chunk->data));
}
void sort_thread()
{
while (!end_of_data)
{
try_sort_chunk();
std::this_thread::yield();
}
}
};
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input) //代表了sorter类的大部分功能
{
if (input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}
/** 测试 **/
int main(int argc, char const *argv[])
{
auto print = [](const int& n) {std::cout << " " << n; };
std::list<int> table = {52, 63, 8, -3, 0, 999, 66, 128, -60};
std::for_each(table.begin(), table.end(), print);
std::cout << std::endl;
std::list<int> res = parallel_quick_sort(table);
std::for_each(res.begin(), res.end(), print);
std::cout << std::endl;
return 0;
}
// 测试部分出现了问题如下:
/**
* 1.不修改chunk_to_sort类时(原文用的struct)编译时会发生报错,
* 报错内容为调用可不存在的赋值构造函数;
* 2.修改后出现抛出空栈异常的,程序出现段错误。。。本人功力不够,多线程gdb调试不明白;
* 3.初步判断可能原因有以下几点:
* -因为在线程池中有可能存在无限制的递归调用
* -如果 chunks 为空,则该函数将不断轮询 chunks 直到其非空,这可能会导致 CPU 占用过高。
*/
最近刚从新捡起来c++,需要熟悉一段时间,而且多线程这里本身也是刚学的所以不太熟悉,这个问题先放这里,回头再复习的时候再研究。这段大概的思想是先使用原来的递归方法,不过在划分区域时不断将小于目标值的部分压入线程安全栈中,大于目标值的部分递归调用,然后对栈里的片段也快排(这里看的我有点晕了,猪脑过载了),不过我看代码大概的原理是“优先”小的排序不断将返回的小的值插到结果list的最前边,最后形成一个递增序列,但是不知道代码哪里有问题。。。
8.2 影响并发代码性能的因素
8.2.1 处理器数量
处理器数量和结构是多线程程序的首要和关键的因素。可能会出现过度订阅的问题,为了线程数量能跟随硬件数量扩展,C++11提供了std::thread::hardware_concurrency() 方法查看当前硬件的线程支持情况,当然这个函数并没有考虑其他运行的程序所以在使用是仍然需要考虑具体情况。