目录
一. 线程说明
1. 概念
线程是进程中的一个实体,是被系统独立分配和调度的基本单位。也有说,线程是CPU可执行调度的最小单位。
2. 线程与进程
线程是进程的可执行单元,是计算机分配CPU机时的基本单元。一个进程可以包含一个或多个线程,进程是通过线程去执行代码的。同一个进程的多个线程共享该进程的资源和操作系统分配给该进程的内存空间。每个进程必须有一个主线程,主线程退出之后该进程也就退出了。一个进程的主线程是由系统创建的。
3. 执行
在单CPU中,表面上看好像是多个进程中的多个线程共同执行,实际上是操作系统根据调度规则、依次的 将一个一个的线程可执行代码加载进CPU中执行的;即,CPU在同一时刻只能执行一段代码,由于CPU的频率非常快,迅速的在各个线程间进行切换,所以给人的感觉就好像是多个线程共同执行。
不过在多核CPU的电脑中,确实是多个线程共同执行……
所以,通过多线程,一个进程的应用程序可以充分利用CPU资源,但什么事情都是物极必反,如果线程开的太多,系统就会增加额外的开销去进行线程的调度,反而降低了CPU的使用效率。
二. windows下的各种线程
1. win32线程
(1) CreateThread
具体的函数使用可以配合msdn,这里不作函数说明,后面的都是如此。下面只说下线程回调函数:只能是全局或静态成员函数。来看看具体实现
void CthreadTestDlg::DoThread(void* pvPrm)
{
CthreadTestDlg* pobjThreadTest = static_cast<CthreadTestDlg*>(pvPrm);
pobjThreadTest->RunThread();
}
void CthreadTestDlg::RunThread()
{
int iNum = 0;
while (100 > iNum)
{
printf("thread num: %d \n", iNum++);
}
}
void CthreadTestDlg::OnBnClickedBtnWin32()
{
if (nullptr == m_hThread)
m_hThread = CreateThread(nullptr, 0, (LPTHREAD_START_ROUTINE)DoThread, (LPVOID)this, 0, 0);
}
... ...
//清理
if (nullptr != m_hThread) CloseHandle(m_hThread);(2) _beginthreadex
还是要说下线程回调函数:全局或静态成员函数,且需要添加_stdcall限制。下面看看实现:
unsigned _stdcall CthreadTestDlg::DoThread2(void* pvPrm)
{
CthreadTestDlg* pobjThreadTest = static_cast<CthreadTestDlg*>(pvPrm);
pobjThreadTest->RunThread();
return 0;
}
void CthreadTestDlg::RunThread()
{
int iNum = 0;
while (100 > iNum)
{
printf("thread num: %d \n", iNum++);
}
}
void CthreadTestDlg::OnBnClickedBtnWin322()
{
if (nullptr == m_hThread2)
m_hThread2 = (HANDLE)_beginthreadex(nullptr, 0, DoThread2, this, 0, 0);
}
... ...
//清理
if (nullptr != m_hThread2) CloseHandle(m_hThread2);(3) 如何选择使用哪个线程函数
C运行库中有许多全局变量,如错误码等。多个线程同时操作这些全局变量,就会出错(没同步)。为了解决未同步问题,微软想出了个办法,就是给每个线程划分了自己存全局变量的空间,TLS(thread local storage)。这样,各用各的,就不会出错了。奇怪的是,CreateThread压根没TLS的概念,就不会创建这个内存。当在CreateThread创建的线程中要读写那些全局变量时,发现没有TLS,就会创建一个给自己用。可恨的是,CloseHandle中不会释放TLS。这样,内存就泄漏了。所以,为了保险起见,创建线程使用C运行库中的_beginthreadex。
2. mfc线程
(1) 工作线程:AfxBeginThread
本质上是对CreateThread进行封装,且AfxBeginThread可设置线程优先级,同时对比win32线程,不需要CloseHandle。线程回调函数:全局或静态成员变量。下面看看实现:
UINT CthreadTestDlg::DoThreadAfx(void* pvPrm)
{
CthreadTestDlg* pobjThreadTest = static_cast<CthreadTestDlg*>(pvPrm);
pobjThreadTest->RunThread();
return 0;
}
void CthreadTestDlg::RunThread()
{
int iNum = 0;
while (100 > iNum)
{
printf("thread num: %d \n", iNum++);
}
}
//AfxBeginThread不需要CloseHandle
void CthreadTestDlg::OnBnClickedBtnMfc()
{
if (nullptr == m_pThreadAfx)
m_pThreadAfx = AfxBeginThread(DoThreadAfx, this, THREAD_PRIORITY_NORMAL);//可以设置线程优先级
}(2) 界面线程
① 从 CWinThread 类派生自己的子类:CUIThreadApp;
② 重载 InitInstance(必须重载) 与 ExitInstance(可选重载函数),系统帮我们重载了,这一步可以省略;
③ 在 InitInstance 函数中进行界面的创建,且要return FALSE, 否则线程退出不了;
④ 开启界面线程:AfxBeginThread(RUNTIME_CLASS(CUIThread))。
// 继承了CWindThread派生类CUIThread
... ...
#define DLG_MODAL (1)
BOOL CUIThread::InitInstance()
{
#if DLG_MODAL //模态对话框的创建
CDlgTest objDlgTest;
objDlgTest.DoModal();
return FALSE;
#elif //非模态对话框的创建
m_pobjDlgTest = new CDlgTest;
m_pobjDlgTest->Create(IDD_DLG_THREAD, nullptr);
m_pobjDlgTest->ShowWindow(SW_SHOW);
m_pobjDlgTest->RunModalLoop();
return FALSE;
//return TRUE; //这里return true,则线程不会退出
#endif
return TRUE;
}
int CUIThread::ExitInstance()
{
#if DLG_MODAL
#elif
if (m_pobjDlgTest)
{
m_pobjDlgTest->DestroyWindow();
delete m_pobjDlgTest;
}
#endif
return CWinThread::ExitInstance();
}
... ...
AfxBeginThread(RUNTIME_CLASS(CUIThread));
(3) 使用说明
虽然mfc定义了界面线程,但实际项目中还是不建议在线程里操作界面,原因:
① 同一进程中的多线程是资源共享的,在多线程中操作界面会发生意想不到的结果,特别是像窗口的销毁等。
② 我们在设计代码时,是希望界面层与业务层是弱耦合关系,以实现单层模块的"高内聚",所以如果线程需要与界面层通信,可以使用PostMessage进行关联。
3. C++11 线程
(1) std:: thread
① 普通线程:还是说下线程回调函数,可以是全局或静态函数,也可以类成员函数,lambda表达式, 且函数类型不固定。
void CthreadTestDlg::OnBnClickedBtnC11()
{
//sth::thread的线程函数可以是全局或静态函数,也可以类成员函数,lambda表达式, 且函数类型不固定
//std::thread objThread(DoThread, this);//方式1:this是函数参数
//std::thread objThread(&CthreadTestDlg::RunThread, this);//方式2
std::thread objThread([]()//方式3 lambda表达式
{
printf("c++11 thread\n");
});
objThread.join();//这里是阻塞等待
}② 分离线程:一旦分离,则线程的回收不再由用户控制,而是转交给系统来回收。也就是说,不需要再使用.join();下面的例子程序结束也不会造成内存泄漏。
void DoThreadDetach()
{
while (1)
{
printf("Detach thread\n");
}
}
void CthreadTestDlg::OnBnClickedBtnC11Detach()
{
//分离线程:不用再join, 由系统结束和回收线程资源
std::thread objThread(DoThreadDetach);
objThread.detach();//
}(2) std::async
std::async 更高层次的线程创建,不关心线程具体的构建过程,且可以获取线程函数的执行结果。实际上,std::async 实际是对std::future,std::promise和std::packaged_task(可以在MSDN上查看相关内容)作了进一步封装,让他们结合起来工作。
int DoThreadAsync()
{
int iNum = 0;
int iSum = 0;
while (100 > iNum)
{
iSum += iNum++;
}
return iSum;
}
void CthreadTestDlg::OnBnClickedBtnAsync()
{
//std::async 更高层次的线程创建,不关心线程具体的构建过程,且可以获取线程函数的执行结果
//std::async 实际是对std::future,std::promise和std::packaged_task作了进一步封装,让他们结合起来工作
auto atThread = std::async(std::launch::async, DoThreadAsync);
printf("Sum: %d\n", atThread.get());// 这里的get会等待线程结束
}4. 总结
如果没有特别的要求,建议使用std::async,一方面构建简单,线程回调函数没有限制,另外还可以获取线程函数的执行结果,在多线程中算法和图像处理使用特别合适。
三. 线程优先级
1. 概念
如果学过linux的POSIX线程,应该知道系统cpu对线程的调度策略有三种,其中一种是“时间片轮转”,也是windows默认的调度方式,上面的概念里讲到过线程执行过程,这里就不概述了。所以在这种调度策略下,高优先级的线程在单位时间内被执行的次数比低优先级的多,注意这里低优先级不是不会执行。
如果跟你预想的不一样,那就再延展一下,做过底层开发,eg,stm32, arm(无系统),则程序执行一般采用“抢占式”的方式,eg,中断优先级低的响应函数A正在执行,此时产生一个高优先级的中断,则程序立马跳转到高优先级的中断响应函数B里去执行,执行结束后,再返回到A原来的位置继续执行。
2. 实现
设置优先级的关键函数:SetThreadPriority。其中AfxBeginThread对其进行了封装,可以直接通过参数设置线程优先级,这里就不详述了,下面看看C++11的线程优先级,使用两个线程在同一时间同时往两个文件里写入数据,查看文件大小。
void CthreadTestDlg::DoThreadPrioritySum1()
{
CFile objFile;
if (objFile.Open(_T("e:\\a.txt"), CFile::modeCreate | CFile::modeWrite))
{
while (1)
{
objFile.Write(_T("1111111\n"), _tcslen(_T("1111111\n")));
if (m_bExitThread) break;
}
objFile.Close();
}
}
void CthreadTestDlg::DoThreadPrioritySum2()
{
CFile objFile;
if (objFile.Open(_T("e:\\b.txt"), CFile::modeCreate | CFile::modeWrite))
{
while (1)
{
objFile.Write(_T("2222222\n"), _tcslen(_T("2222222\n")));
if (m_bExitThread) break;
}
objFile.Close();
}
}
void CthreadTestDlg::OnBnClickedBtnPriority()
{
std::thread objThread1(&CthreadTestDlg::DoThreadPrioritySum1, this);
std::thread objThread2(&CthreadTestDlg::DoThreadPrioritySum2, this);
SetThreadPriority(objThread1.native_handle(), THREAD_PRIORITY_NORMAL);
SetThreadPriority(objThread2.native_handle(), THREAD_PRIORITY_HIGHEST);
std::this_thread::sleep_for(std::chrono::seconds(3));
m_bExitThread = TRUE;
objThread1.join();
objThread2.join();
}
四. 线程通信
1. 概念
一般而言,应用程序中的一个某一个线程总是为另一个线程执行特定的任务,这样,这两个线程之间必定有一个信息传递的渠道。
线程通信一般采用:全局变量(需要加锁)和自定义消息。
2. 自定义消息
#define WM_THREAD_MSG (WM_USER + 100)
... ...
DoThread1: PostThreadMessage(iThreadID, WM_THREAD_MSG, 10, 0);
... ...
DoThread2: while (GetMessage(&stMsg, 0, 0, 0)) {if (WM_THREAD_MSG == stMsg.message)}3. C++11通信
c++11新新特性中提出了std::promise和std::future,它们可以搭配使用来达到单次数据交换的目的,这里就不作说明了。由于std::future或std::async里.get能获取到线程执行结果,且线程回调函数自由无拘束,所以线程间通信变得异常简单。下面看看闭包的例子:
void CthreadTestDlg::OnBnClickedBtnCom()
{
//闭包的线程通信
int iNum = 0;
auto atThread1 = std::async(std::launch::async, [&]()
{
while (100 > iNum++)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
auto atThread2 = std::async(std::launch::async, [&]()
{
while (1)
{
if (50 == iNum)
{
printf("iNum: %d\n", iNum);
break;
}
}
});
atThread1.wait();
atThread2.wait();
}五. 线程同步
1. 概念
线程同步是指同一进程中的多个线程互相协调工作从而达到一致性。之所以需要线程同步,是因为多个线程同时对一个数据对象进行修改操作时,可能会对数据造成破坏。下面先看看线程不同步的例子,我们预期是希望m_iSum一直是1,事实如此嘛?
void CthreadTestDlg::DoThreadSync()
{
for (int iNum = 0; iNum < 100; iNum++)
{
//std::unique_lock<std::mutex> lock(m_mutex);
m_iSum += 1;
printf("%d ", m_iSum);
m_iSum -= 1;
}
}
void CthreadTestDlg::OnBnClickedBtnMutex()
{
for (int iNum = 0; iNum < 30; iNum++)
{
std::async(std::launch::async, &CthreadTestDlg::DoThreadSync, this);
}
} 
2. 临界区
注意"初始化临界区"和"销毁临界区",同时,线程执行时间不宜太长。
InitializeCriticalSection(&m_cs);//初始化临界区 可以放到构造函数
DeleteCriticalSection(&m_cs);//删除临界区 可以放到析构函数
... ...
void CthreadTestDlg::DoThreadCritical()
{
for (int iNum = 0; iNum < 100; iNum++)
{
EnterCriticalSection(&m_cs);//进入临界区
m_iSum += 1;
printf("%d ", m_iSum);
m_iSum -= 1;
LeaveCriticalSection(&m_cs);//离开临界区
}
}
void CthreadTestDlg::OnBnClickedBtnCritical()
{
for (int iNum = 0; iNum < 30; iNum++)
{
std::async(std::launch::async, &CthreadTestDlg::DoThreadCritical, this);
}
}
3. 事件
注意SetEvent(): 设置事件信号, ResetEvent(): 清空事件信号, waitforsigleobject(): 等待单个事件, WaitForMulitpleObjects(): 等待多个事件,也就这些要说明一下的,下面看例子, 结果是1秒后打印消息,跟我们的预期一样
m_hEvent = CreateEvent(nullptr, FALSE, FALSE, nullptr); //可以放到构造里
... ...
void CthreadTestDlg::DoThreadEvent1()
{
if (WAIT_OBJECT_0 == WaitForSingleObject(m_hEvent, INFINITE))//INFINITE 阻塞等待
{
ResetEvent(m_hEvent);//清空事件
printf("thread event\n");
}
}
void CthreadTestDlg::DoThreadEvent2()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
SetEvent(m_hEvent);
}
void CthreadTestDlg::OnBnClickedBtnEvent()
{
std::async(std::launch::async, &CthreadTestDlg::DoThreadEvent1, this);
std::async(std::launch::async, &CthreadTestDlg::DoThreadEvent2, this);
}
4. 互斥量
(1) 互斥锁
mutex, recursive_mutex, time_mutex, resursive_time_mutex四种锁,详细信息可以MSDN或百度,一般操作:.lock(), trylock(), unlock();
(2) lock_guard
与 Mutex RAII 相关,方便线程对互斥量上锁;
(3) std::unique_lock
与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。
根据上面的说明,显然,我们要使用std::uniqu_lock进行说明,简单好用。
void CthreadTestDlg::DoThreadSync()
{
for (int iNum = 0; iNum < 100; iNum++)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_iSum += 1;
printf("%d ", m_iSum);
m_iSum -= 1;
}
}
void CthreadTestDlg::OnBnClickedBtnMutex()
{
//std::vector<std::thread> m_vecThreads;
//for (int iNum = 0; iNum < 30; iNum++)
//{
// m_vecThreads.push_back(std::thread(&CthreadTestDlg::DoThreadSync, this));
//}
//for (auto &atThread : m_vecThreads)
//{
// atThread.join();
//}
for (int iNum = 0; iNum < 30; iNum++)
{
std::async(std::launch::async, &CthreadTestDlg::DoThreadSync, this);
}
}
5. 信号量
(1) 说明
std::condition_variable 只接受std::unique_lock的参数,开销小,std::condition_variable_any可以接受任何lockable参数,但开销更大,所以,一般还是建议使用std::conditon_variable。std::condition_variable 使用wait来阻塞线程,notify_xx来唤醒线程。
(2) wait
① wait :无条件阻塞
std::unique_lock<std::mutex> lock(m_mutex);
m_Sem.wait(lock);② wait :条件阻塞
std::unique_lock<std::mutex> lock(m_mutex);
m_Sem.wait(lock, [this](){ return m_bSem; });③ wait_for:超时等待(时间段)
④ wait_until:指定等待到某个时间
(3) notify_xx
notify_once:唤醒某个线程 notiry_all: 唤醒所有线程
(4) 实现
void CthreadTestDlg::DoThreadSem1()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_Sem.wait(lock, [this](){ return m_bSem; });
printf("thread sem \n");
}
void CthreadTestDlg::DoThreadSem2()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
m_Sem.notify_one();
m_bSem = TRUE;
}
void CthreadTestDlg::OnBnClickedBtnSem()
{
std::async(std::launch::async, &CthreadTestDlg::DoThreadSem1, this);
std::async(std::launch::async, &CthreadTestDlg::DoThreadSem2, this);
}
六. 线程池(C++11)
1. 概念
为什么要引入线程池,前面有提到过,当线程开启过多时,系统就会增加额外的开销去进行线程的调度,反而降低了CPU的使用效率。
线程池可以高效的处理任务,线程池中开启多个线程,等待同步队列中的任务到来,任务到来多个线程会抢着执行任务,当到来的任务太多,达到上限时需要等待片刻,任务上限保证内存不会溢出。线程池的效率和cpu核数相关,多核的话效率更高,线程数一般取cpu数量+2比较合适,否则线程过多,线程切换频繁反而会导致效率降低。
2. 同步任务队列
先说说线程池的基本设计思路,一个任务队列,一个线程池(预先创建好线程等待),一个调度管理,当任务队列里有任务时,则从任务队列里取出任务,同时从线程池里唤醒一个线程用来执行任务,执行结束后继续回到线程池里等待。
队列的设计,一方面必须设定队列元素的最大值,否则会增加内存压力,同时,增加一次性取出任务队列,减少锁的访问以提高性能。
#pragma once
#include <condition_variable>
#include <list>
#include <mutex>
#include <iostream>
template<typename T>
class CSyncQueue
{
private:
std::list<T> m_listQueue; //任务队列
std::mutex m_Mutex;
std::condition_variable m_semNotEmpty;//队列是否为空
std::condition_variable m_semNotFull; //队列是否满了
UINT m_uiMaxSize; //队列最大任务数
bool m_bNeedStop; //是否停止任务
private:
bool NotFull() const
{
bool bFull = m_listQueue.size() >= m_uiMaxSize;
if (bFull)
std::cout << "缓冲区满了,需要等待..." << std::endl;
return !bFull;
}
bool NotEmpty() const
{
bool bEmpty = m_listQueue.empty();
if (bEmpty)
std::cout << "缓冲区空了, 需要等待...,异步层的线程ID:" << std::this_thread::get_id() << std::endl;
return !bEmpty;
}
template<typename F>
void Add(F &&task)
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_semNotFull.wait(lock, [this]()
{
return (m_bNeedStop || NotFull());
});
if (m_bNeedStop) return;
m_listQueue.push_back(std::forward<F>(task));
m_semNotEmpty.notify_one();//通知一个线程去处理任务
}
public:
CSyncQueue(UINT uiMaxSize) : m_uiMaxSize(uiMaxSize), m_bNeedStop(false)
{}
//添加任务
void Put(const T& task)
{
Add(task);
}
void Put(T&& task)
{
Add(std::forward<T>(task));
}
//取出任务
void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_semNotEmpty.wait(lock, [this]()
{
return m_bNeedStop || NotEmpty();
});
if (m_bNeedStop) return;
list = std::move(m_listQueue);
m_semNotFull.notify_one();//通知同步队列任务取出了,可以继续添加任务了
}
void Take(T& task)
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_semNotEmpty.wait(lock, [this]()
{
return m_bNeedStop || NotEmpty();
});
if (m_bNeedStop) return;
task = m_listQueue.front();
m_listQueue.pop_front();
m_semNotFull.notify_one();
}
void Stop()
{
{
std::lock_guard<std::mutex> lock(m_Mutex);
m_bNeedStop = true;
}
//这里不需要加锁
m_semNotFull.notify_all();
m_semNotEmpty.notify_all();
}
bool Empty()
{
std::lock_guard<std::mutex> lock(m_Mutex);
return m_listQueue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> lock(m_Mutex);
return (m_listQueue.size() == m_iMaxSize);
}
int Count()
{
return m_listQueue.size();
}
};
3. 线程池
用来管理任务队列与线程池,添加任务执行接口, 这里的任务接口函数可以根据具体的情况进行设置,同时支持右值,减少参数副本的内存开销,同时,特别情况下可以结束整个线程池。
#pragma once
#include <list>
#include <memory>
#include <thread>
#include "SyncQueue.hpp"
#include <functional>
#include <atomic>
class CThreadPools
{
public:
using Task = std::function<void()>;
CThreadPools(UINT uiThreadNum = std::thread::hardware_concurrency());
~CThreadPools();
void AddTask(const Task& task);
void AddTask(Task&& task);//右值
void Stop();
private:
void Start(UINT uiThreadNum);
void RunInThread();
void StopThreadList();
private:
std::list<std::shared_ptr<std::thread>> m_listThreads;
CSyncQueue<Task> m_objSyncQueue;
std::atomic_bool m_bRunning;
std::once_flag m_flag;
};
#include "stdafx.h"
#include "ThreadPools.h"
const int g_iMaxTaskCount = 100;//最大任务数
CThreadPools::CThreadPools(UINT uiThreadNum) : m_objSyncQueue(g_iMaxTaskCount)
{
Start(uiThreadNum);
}
CThreadPools::~CThreadPools()
{
//如果用户没有停止线程则由系统处理
Stop();
}
void CThreadPools::Start(UINT uiThreadNum)
{
m_bRunning = true;
//创建线程
for (UINT uiNum = 0; uiNum < uiThreadNum; uiNum++)
{
m_listThreads.push_back(std::make_shared<std::thread>(&CThreadPools::RunInThread, this));
}
}
void CThreadPools::RunInThread()
{
while (m_bRunning)
{
//取出任务,分别执行任务
std::list<Task> listTasks;
m_objSyncQueue.Take(listTasks);//从任务队列取出任务
for (auto &atTask : listTasks)
{
if (!m_bRunning) return;
atTask();
}
}
}
void CThreadPools::StopThreadList()
{
m_objSyncQueue.Stop();
m_bRunning = false;//线程停止标志
//等待所有线程结束
for (auto atThread : m_listThreads)
{
if (atThread) atThread->join();
}
//清理资源
m_listThreads.clear();
}
void CThreadPools::AddTask(const Task& task)
{
m_objSyncQueue.Put(task);
}
void CThreadPools::AddTask(Task&& task)
{
m_objSyncQueue.Put(std::forward<Task>(task));
}
void CThreadPools::Stop()
{
//保证多线程的情况下只调用一次StopThreadList
std::call_once(m_flag, [this]()
{
StopThreadList();
});
}4. 实现
线程池实现:https://www.cnblogs.com/qicosmos/archive/2013/05/30/3108010.html写得很好,实际项目中有用到,看看执行结果:
void CthreadTestDlg::OnBnClickedBtnThreadpool()
{
CThreadPools objThreaadPools(3);
for (int iNum = 0; iNum < 2; iNum++)
{
std::async(std::launch::async, [&objThreaadPools, iNum]()
{
for (int iCount = 0; iCount < 100; iCount++)
{
auto atThreadId = std::this_thread::get_id();
objThreaadPools.AddTask([=]()
{
//printf("同步线程%d的线程ID: %d\n", iNum, atThreadId);
std::cout << "同步线程" <<iNum << "的ID:" << atThreadId << std::endl;
});
}
});
}
std::this_thread::sleep_for(std::chrono::seconds(3));//等待线程池执行结束
objThreaadPools.Stop();
}
测试代码地址:https://download.csdn.net/download/zhoumin4576/20660216?spm=1001.2014.3001.5501