Linux下生产者与消费者实现的问题详解

什么是生产者消费者问题?

背景

假设进程中有一个有限缓冲区和两个线程:生产者和消费者,它们分别不停地把产品放入缓冲区和从缓冲区中拿走产品,一个生产者在缓冲区满的时候必须等待,一个消费者在缓冲区空的时候也必须等待。另外,因为缓冲区是临界资源,所以生产者和消费者之间必须互斥执行。

问题的核心

1. 要保证不让生产者在缓存还是满的时候仍然要向缓冲区写入数据
2. 不让消费者从没有数据的缓冲区中读取数据。

由此可见,生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,他们也是同步关系。

解决思路

对于生产者,如果缓冲区满了话就让他停下来歇一会,但是注意不要让他睡着了。消费者从缓冲区取走数据后就让生产者起来干活,让他再次生产出产品。如果消费者发现缓冲区空了,已经没有东西可以消费了,就停下来自己休息一会,然后等看到生产者把东西生产出来了就重新站起来干活。But需要注意一点的是,如果没有处理好两者间的工作关系,那么就有可能会产生生产者和消费者都在休息,都在等着对方生产/消费产品,就会造成“死锁”的现象。

单生产者和单消费者

只有生产者和消费者两个线程,正好是这两个线程之间存在同步和互斥的关系,那么需要解决的是互斥和同步PV操作的位置。使用“线程间通信”,利用信号就可以解决唤醒的问题。

同步信号的使用

伪代码(逻辑)

pthread_mutex_t mutex = 1;//临界区的互斥信号量
sem_t full = 0;//缓冲区初始化为空
sem_t empty = N;//空闲缓冲区
/*生产者线程*/
void *producer()
{
	for(;;)
	{
		produce an product in nextput;//生产者生产数据
		P(empty);//获取空缓冲区单元
		P(mutex);//进入临界区
		add nextput to buffer;//将数据存放到缓冲区里面
		V(mutex);//离开临界区,释放互斥信号量
		V(full);//满缓冲区数加1
	}
}
void *consumer()
{
	for(;;)
	{
		P(full);//获取满缓冲区单元
		P(mutex);//进入临界区
		remove an product from buffer;//从缓冲区中取出数据
		V(mutex);//离开临界区,释放互斥信号
		V(empty);//空缓冲区数加1
		consume the product;//消费数据
	}
}
/*需要注意的是:要注意对缓冲区大小为N的处理,当缓冲区中有空时便可对empty变量执行P操作,
一旦取走一个产品便要执行V操作以释放空闲区。对empty和full变量的P操作必须放在对mutex的P操作之前。*/

解释上述伪代码

  1. 这里我使用了两个信号:full和empty信号。互斥信号量为mutex,它用于控制互斥访问缓冲区,互斥信号量的初始值为1;信号量full适用于记录当前缓冲区中满缓冲区的数量,初始值为0;信号量empty用于记录当前缓冲区中”空“缓冲区数,初值为N。新的数据添加到缓冲区后,full在增加,empty就减少。如果生产者试图在empty为0时减少它的值,那么生产者就会被告知要休息啦。下一轮中如果有数据被消费掉时,empty就会增加,生产者就会被叫起来要工作啦。
  2. 对于empty和full变量的P操作必须放在对mutex的P操作之前,是因为如果生产者进程已经将缓冲区放满,消费者线程并没有取产品,即empty=0,当下次仍然是生产者线程在工作时,他先执行P(mutex)封锁信号量,再执行P(empty)时将被阻塞,希望消费者取出产品后再将其唤醒。当轮到消费者线程工作时,他先执行P(mutex),然而由于生产者线程已经封锁住了mutex信号量,消费者线程也会工作不了,被阻塞住了,这样一来生产者和消费者线程都被阻塞了,都希望对方叫醒自己,于是化身望夫石陷入了无休止的等待中。
  3. 如果消费者线程已经将缓冲区取空,即full=0,下次如果还是消费者先运行,也会出现类似的死锁。不过需要注意的是:生产者释放信号量时,mutex、full先释放哪一个都没关系,消费者先释放mutex还是empty都可以。

多生产者和多消费者

与单生产者和单消费者的区别

与单生产者和单消费者不同的是,在多个生产者和多个消费者同时出现的情况下就会造成你挤我我挤你的情况,就会导致两个或多个线程同时向一个通道写入或读取数据。如果线程有多个生产者并发执行那么就会出现以下几种情况:

  1. 两个生产者为empty减值(sem_wait(&empty))。
  2. 一个生产者在判断缓存中是否有可用的通道。
  3. 第二个生产者与第一个生产者一样在判断缓存中是否有可用通道。
  4. 两个生产者同时向同一个通道写入数据。

理解多生产者和多消费者

多个生产者向一个缓冲区存入数据,多个消费者从缓冲区读取数据。这是有界缓冲区问题,队列改写,生产者与生产者之间、消费者与消费者之间、生产者与消费者之间互相互斥。共享缓冲区作为一个环形缓冲区,存数据到尾时再从头开始。

解决方法

  1. 我们可以使用一个互斥量来保护生产者向缓冲区中存入数据,但是由于有多个生产者,所以我们需要记住当前生产者向缓冲区存入的位置。
  2. 我们可以使用另外一个互斥量来保护缓冲区中消息的数目,这个生产的数据数目作为生产者和消费者之间通信的信号标。
  3. 我们还可以使用一个条件变量来唤醒消费者。同样,因为有多个消费者,所以消费者也需要记住每次提取数据的位置。

部分代码展示

//function:producer thread
void *producer(void *param)
{
#if MODE == SINGLEROLE
    static int t = 0;
    buffer_item item;
    while (TRUE)
    {

        // t = rand () % MAX_INTERVAL + 1;
        sleep(1); // sleep for a random period of time
        // item = rand() % MAX_NUM + 1; // produce an item
        item = str[i];
        if (insert_item(item))
            printf("Insert item %c failed\n", item);
        else
            printf("Producer produced %c\n", item);
        i++;
        t++;
        if (t == 31)
            pthread_exit(0);
    }
// exit(0);
#elif MODE == MANYROLE
    //多个生产者
    for (;;)
    {
        sleep(1);
        pthread_mutex_lock(&put.mutex);
        //检测是否已满
        if (put.nval >= NUMOFNAME)
        {
            pthread_mutex_unlock(&put.mutex);
            return NULL;
        }
        buffer[in] = str[put.nval];
        printf("Buffer[%d]: ", in);
        in = (in + 1) % BUFFER_SIZE;
        if (str[put.nval] == '2')
            idflag++;
        if (idflag >= 2)
        {
            idbuffer[idline] = str[put.nval];
            idline++;
        }
        printf("Producer produced %c\n", str[put.nval]);
        //判断是否数组下标溢出
        if (++put.nput >= BUFFER_SIZE)
        {
            put.nput = 0;
        }
        ++put.nval;
        pthread_mutex_unlock(&put.mutex);

        //当生产了数据后通知条件变量,应该使临界区尽量短,宁愿使用多个互斥量
        pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0)
        {
            pthread_cond_signal(&nready.cond);
        }
        ++nready.nready;
        pthread_mutex_unlock(&nready.mutex);
    }
    return NULL;
#endif
}
// function: consumer thread
void *consumer(void *param)
{
#if MODE == SINGLEROLE
    static int t = 0;
    buffer_item item;

    while (TRUE)
    {
        // t = rand () % MAX_INTERVAL + 1;
        sleep(1); // sleep for a random period of time
        // if (item == '\n')
        //     pthread_exit(0);
        //  break;
        if (remove_item(&item))
            printf("Remove item failed\n");
        else
            printf("Consumer consumed %c\n", item);
        t++;
        if (t == 31)
        {
            pthread_exit(0);
        }
    }
// exit(0);
#elif MODE == MANYROLE
    //多个消费者
    for (;;)
    {
        sleep(1);
        pthread_mutex_lock(&nready.mutex);
        // 确保线程唤醒
        while (nready.nready == 0)
        {
            pthread_cond_wait(&nready.cond, &nready.mutex);
        }
        if (++nready.nget >= NUMOFNAME)
        {
            if (nready.nget == NUMOFNAME)
            {
                printf("Buffer[%d]: Consumer consumed %c\n", nready.nget - 1, buffer[(nready.nget - 1) % BUFFER_SIZE]);
            }
            pthread_cond_signal(&nready.cond);
            pthread_mutex_unlock(&nready.mutex);
            return NULL;
        }
        --nready.nready;
        pthread_mutex_unlock(&nready.mutex);
        if (str[nready.nget] == 'j')
            nameflag++;
        if (nameflag)
        {
            Consume_Buffer[nameline] = str[nready.nget];
            nameline++;
            if (str[nready.nget] == 'n')
                nameflag++;
            if (nameflag == 3)
                nameflag = 0;
        }
        printf("Buffer[%d]: Consumer consumed %c\n", nready.nget - 1, buffer[(nready.nget - 1) % BUFFER_SIZE]);
    }
    return NULL;
#endif
}

多线程使用信号量

信号量的主要函数

int sem_init(sem_t *sem,int pshared,unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t sem);
  • sem_init():用于对指定信号初始化,pshared为0,表示信号在当前进程的多个线程之间共享,value表示初始化信号的值。
  • sem_wait():可以用来阻塞当前线程,直到信号量的值大于0,解除阻塞。解除阻塞后,sem的值-1,表示公共资源被执行减少了。例如:如果你对一个值为2的信号量调用sem_wait(),线程将会继续执行,信号量的值将-1。当初始化value=0后,使用sem_wait()会阻塞这个线程,这个线程函数就会等待其它线程函数调用sem_post()增加了了这个值使它不再是0,才开始执行,然后value值减1。
  • sem_post():用于增加信号量的值+1,当有线程阻塞在这个信号量上时,调用这个函数会使其中的一个线程不在阻塞,选择机制由线程的调度策略决定。

线程主要函数

int pthread_create(pthread_t *thread,const pthread_attr_t *attr,void *(*start_routine)(void*),void *arg);
void pthread_exit(void *retval);
int pthread_join(pthread_t thread,void **thread_result);
int pthread_cancel(pthread_t thread)

一般来说,我们可以利用pthread_create()接口创建一个线程,同时利用pthread_join()接口阻塞主进程的执行,直到合并的线程执行结束。如果线程运行正常那么pthread_join的返回值为0。线程执行期间,如果我们想主动退出线程的执行,那么我们可以用pthread_exit()接口函数进行退出。

补充:线程的合并与分离

线程的合并:pthread_join()

线程的合并是一种主动回收线程资源的方案,当一个进程或线程调用了针对其它线程的pthread_join()接口,就是线程合并了。这个接口会阻塞调用进程或线程,直到被合并的线程结束为止。当被合并线程结束,pthread_join()接口就会回收这个线程的资源,并将这个线程的返回值返回给合并者。

线程的分离:pthread_detach()

线程分离是将线程资源的回收工作交由系统自动来完成,也就是说当被分离的线程结束之后,系统会自动回收它的资源。因为线程分离是启动系统的自动回收机制,那么程序也就无法获得被分离线程的返回值,这就使得pthread_detach()接口只要拥有一个参数就行了,那就是被分离线程句柄。

互斥锁

int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutexattr_t *mutexattr);//互斥锁初始化
int pthread_mutex_lock(pthread_mutex_t *mutex);//互斥锁上锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);//互斥锁解锁

在同一时刻只能有一个线程持有某个互斥锁,拥有互斥锁的线程能够对共享资源进行操作。若线程对一个已经被上锁的互斥锁加锁,该线程就会睡眠直到其他线程释放掉互斥锁为止。可以说,这把互斥锁保证了让每个线程对共享资源按顺序进行原子操作。

线程属性

int pthread_attr_init(pthread_attr_t **attr);
int pthread_attr_destory(pthread_attr_t *attr);

线程属性对象由pthread_attr_init()接口初始化,由pthread_attr_destory()来销毁

线程的绑定属性

int pthread_attr_setscope(pthread_attr_t *attr, int scope);

它有两个参数,第一个就是线程属性对象的指针,第二个就是绑定类型,拥有两个取值:PTHREAD_SCOPE_SYSTEM(绑定的)PTHREAD_SCOPE_PROCESS(非绑定的)

设置线程绑定属性
#include <stdio.h>
#include <pthread.h>
……
int main( int argc, char *argv[] )
{
pthread_attr_t attr;
pthread_t th;
……
pthread_attr_init( &attr );
pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM );
pthread_create( &th, &attr, thread, NULL );
……
}

线程的分离属性

pthread_attr_setdetachstat(pthread_attr_t *attr, int detachstate); 

它的第二个参数有两个取值:PTHREAD_CREATE_DETACHED(分离的)PTHREAD_CREATE_JOINABLE(可合并的,也是默认属性)

设置线程分离属性
#include <stdio.h>
#include <pthread.h>
……
int main( int argc, char *argv[] )
{
pthread_attr_t attr;
pthread_t th;
……
pthread_attr_init( &attr );
pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM );
pthread_create( &th, &attr, thread, NULL );
……
}

调度属性

Linux提供的线程调度属性有三个:算法优先级继承权

//线程调度算法的接口
pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
/*它的第二个参数有三个取值:SCHED_RR(轮询)、SCHED_FIFO(先进先出)和SCHED_OTHER(其它)。*/

//设置线程优先级的接口
struct sched_param {  
    int sched_priority;  
}  
int pthread_attr_setschedparam(pthread_attr_t *attr, struct sched_param *param); 
//设置线程的继承权
int pthread_attr_setinheritsched(pthread_attr_t *attr, int inheritsched);
/*它的第二个参数有两个取值:PTHREAD_INHERIT_SCHED(拥有继承权)和PTHREAD_EXPLICIT_SCHED(放弃继承权)。
新线程在默认情况下是拥有继承权。*/

共享内存与消息队列的使用

常用的接口函数

shmget()创建共享内存
shmat():将创建的共享内存映射到具体的进程空间中。
shmdt()撤销映射。
在这里插入图片描述
msgget():创建或打开消息队列(消息队列的数量会受到系统消息队列数量的限制)
msgsnd():添加消息使用(添加到消息队列的末尾)
msgrcv():读取消息(把消息从消息队列取走)

消息队列和共享内存代码展示

/*
* set_shm 函数建立一个具有n 个字节的共享内存区
* 如果建立成功,返回一个指向该内存区首地址的指针shm_buf
* 输入参数:
* shm_key 共享内存的键值
* shm_val 共享内存字节的长度
* shm_flag 共享内存的存取权限
*/
/*实现过程:创建共享内存->映射到进程的指针并返回*/
char * set_shm(key_t shm_key,int shm_num,int shm_flg)
{
    int i,shm_id;
    char * shm_buf;
    //测试由shm_key 标识的共享内存区是否已经建立
    if((shm_id = get_ipc_id("/proc/sysvipc/shm",shm_key)) < 0 )
    {
    //shmget 新建一个长度为shm_num 字节的共享内存,其标号返回到shm_id
        if((shm_id = shmget(shm_key,shm_num,shm_flg)) <0)
        {
            perror("shareMemory set error");
            exit(EXIT_FAILURE);
        }
    //shmat 将由shm_id 标识的共享内存附加给指针shm_buf
        if((shm_buf = (char *)shmat(shm_id,0,0)) < (char *)0)
        {
            perror("get shareMemory error");
            exit(EXIT_FAILURE);
        }
        /*共享内存区初始化*/
        for(i=0; i<shm_num; i++)
        shm_buf[i] = 0; //初始为0
    }
    //shm_key 标识的共享内存区已经建立,将由shm_id 标识的共享内存附加给指针shm_buf
    if((shm_buf = (char *)shmat(shm_id,0,0)) < (char *)0)
    {
        perror("get shareMemory error");
        exit(EXIT_FAILURE);
    }
    return shm_buf;
}
/*
* set_msq 函数建立一个消息队列
* 如果建立成功,返回一个消息队列的标识符msq_id
* 输入参数:
* msq_key 消息队列的键值
* msq_flag 消息队列的存取权限
*/
/*实现过程:创建消息队列->返回ID*/
int set_msq(key_t msq_key,int msq_flg)
{
    int msq_id;
    //测试由msq_key 标识的消息队列是否已经建立
    if((msq_id = get_ipc_id("/proc/sysvipc/msg",msq_key)) < 0 )
    {
        //msgget 新建一个消息队列,其标号返回到msq_id
        if((msq_id = msgget(msq_key,msq_flg)) < 0)
        {
            perror("messageQueue set error");
            exit(EXIT_FAILURE);
        }
    }
    return msq_id;
}

版权声明:本文为dbqwcl原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。