生产者-消费者问题(C++信号量实现)

生产者-消费者问题是很经典的线程同步问题,这段代码给出的是多生产者、多消费者、有界缓冲区的一个C++实现。
常见的生产者消费者实现一般是Unix或Linux下的使用Pthread完成的,这里使用WINDOWS环境下的API实现。

#include<iostream>
#include<queue>
#include<Windows.h>
#include<unordered_map>

using namespace std;
static const int QUEUE_SIZE = 10;
static HANDLE semEmpty = NULL;
static HANDLE semFull = NULL;

static const int PRODUCER_SIZE = 3;
static const int CONSUMER_SIZE = 10;
static HANDLE event = NULL;
static CRITICAL_SECTION queueMutex;//临界区锁,用于维护临界区,保护bufferQueue
static queue<int> bufferQueue;
static int goodsNum = 0;//当前生产商品的序号
unordered_map<DWORD,int> id;//hashmap用来记录线程id与进程序号的映射

DWORD WINAPI ProducerThread()
{
	while (true)
	{
		if (WAIT_TIMEOUT != WaitForSingleObject(event, 0))
			break; 

		WaitForSingleObject(semEmpty, INFINITE);
		EnterCriticalSection(&queueMutex);
		bufferQueue.push(goodsNum++);
		cout << "生产者"<< id[GetCurrentThreadId()] <<"正在生产" << bufferQueue.back()<<"号商品"<< endl;
		LeaveCriticalSection(&queueMutex);
		ReleaseSemaphore(semFull, 1, NULL);
	}
	return 0;
}

DWORD WINAPI ConsumerThread()
{
	while (true)
	{
		if (WAIT_TIMEOUT != WaitForSingleObject(event, 0))
			break; 

		WaitForSingleObject(semFull, INFINITE);
		EnterCriticalSection(&queueMutex);
		cout << "\t\t\t消费者" << id[GetCurrentThreadId()] << "正在消费" << bufferQueue.front() << "号商品" << endl;
		bufferQueue.pop();
		LeaveCriticalSection(&queueMutex);
		ReleaseSemaphore(semEmpty, 1, NULL);
	}
	return 0;
}

int main(void)
{
	HANDLE ThreadArray[CONSUMER_SIZE+PRODUCER_SIZE];
	InitializeCriticalSection(&queueMutex); // queue读写同步
	event = CreateEvent(NULL,true,false,NULL); // 手动控制、初始为非激发状态
	
	semEmpty = CreateSemaphore(NULL, QUEUE_SIZE, QUEUE_SIZE, NULL); 
	semFull = CreateSemaphore(NULL, 0, QUEUE_SIZE, NULL);
	
	for (int i = 0; i < PRODUCER_SIZE; i++)
	{
		DWORD tid;
		ThreadArray[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProducerThread, NULL, 0, &tid);
		pair<DWORD, int> pa;
		pa.first = tid;
		pa.second = i+1;
		id.insert(pa);
	}
	for (int i = 0; i < CONSUMER_SIZE; i++)
	{
		DWORD tid;
		ThreadArray[PRODUCER_SIZE+i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ConsumerThread, NULL, 0, &tid);
		pair<DWORD, int> pa;
		pa.first = tid;
		pa.second = i+1;
		id.insert(pa);
	}
	Sleep(10000);
	SetEvent(event);//10秒后退出生产者、消费者线程
	WaitForMultipleObjects(PRODUCER_SIZE+CONSUMER_SIZE, ThreadArray, TRUE, INFINITE); 
	for (int i = 0; i < PRODUCER_SIZE+CONSUMER_SIZE; i++)
		CloseHandle(ThreadArray[i]);

	CloseHandle(semEmpty);
	CloseHandle(semFull);
	CloseHandle(event);

	DeleteCriticalSection(&queueMutex);
	system("pause");
	return 0;
}

版权声明:本文为lxx909546478原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。