生产者-消费者问题是很经典的线程同步问题,这段代码给出的是多生产者、多消费者、有界缓冲区的一个C++实现。
常见的生产者消费者实现一般是Unix或Linux下的使用Pthread完成的,这里使用WINDOWS环境下的API实现。
#include<iostream>
#include<queue>
#include<Windows.h>
#include<unordered_map>
using namespace std;
static const int QUEUE_SIZE = 10;
static HANDLE semEmpty = NULL;
static HANDLE semFull = NULL;
static const int PRODUCER_SIZE = 3;
static const int CONSUMER_SIZE = 10;
static HANDLE event = NULL;
static CRITICAL_SECTION queueMutex;//临界区锁,用于维护临界区,保护bufferQueue
static queue<int> bufferQueue;
static int goodsNum = 0;//当前生产商品的序号
unordered_map<DWORD,int> id;//hashmap用来记录线程id与进程序号的映射
DWORD WINAPI ProducerThread()
{
while (true)
{
if (WAIT_TIMEOUT != WaitForSingleObject(event, 0))
break;
WaitForSingleObject(semEmpty, INFINITE);
EnterCriticalSection(&queueMutex);
bufferQueue.push(goodsNum++);
cout << "生产者"<< id[GetCurrentThreadId()] <<"正在生产" << bufferQueue.back()<<"号商品"<< endl;
LeaveCriticalSection(&queueMutex);
ReleaseSemaphore(semFull, 1, NULL);
}
return 0;
}
DWORD WINAPI ConsumerThread()
{
while (true)
{
if (WAIT_TIMEOUT != WaitForSingleObject(event, 0))
break;
WaitForSingleObject(semFull, INFINITE);
EnterCriticalSection(&queueMutex);
cout << "\t\t\t消费者" << id[GetCurrentThreadId()] << "正在消费" << bufferQueue.front() << "号商品" << endl;
bufferQueue.pop();
LeaveCriticalSection(&queueMutex);
ReleaseSemaphore(semEmpty, 1, NULL);
}
return 0;
}
int main(void)
{
HANDLE ThreadArray[CONSUMER_SIZE+PRODUCER_SIZE];
InitializeCriticalSection(&queueMutex); // queue读写同步
event = CreateEvent(NULL,true,false,NULL); // 手动控制、初始为非激发状态
semEmpty = CreateSemaphore(NULL, QUEUE_SIZE, QUEUE_SIZE, NULL);
semFull = CreateSemaphore(NULL, 0, QUEUE_SIZE, NULL);
for (int i = 0; i < PRODUCER_SIZE; i++)
{
DWORD tid;
ThreadArray[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProducerThread, NULL, 0, &tid);
pair<DWORD, int> pa;
pa.first = tid;
pa.second = i+1;
id.insert(pa);
}
for (int i = 0; i < CONSUMER_SIZE; i++)
{
DWORD tid;
ThreadArray[PRODUCER_SIZE+i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ConsumerThread, NULL, 0, &tid);
pair<DWORD, int> pa;
pa.first = tid;
pa.second = i+1;
id.insert(pa);
}
Sleep(10000);
SetEvent(event);//10秒后退出生产者、消费者线程
WaitForMultipleObjects(PRODUCER_SIZE+CONSUMER_SIZE, ThreadArray, TRUE, INFINITE);
for (int i = 0; i < PRODUCER_SIZE+CONSUMER_SIZE; i++)
CloseHandle(ThreadArray[i]);
CloseHandle(semEmpty);
CloseHandle(semFull);
CloseHandle(event);
DeleteCriticalSection(&queueMutex);
system("pause");
return 0;
}
版权声明:本文为lxx909546478原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。