进程间通信之同步

关于UNIX环境高级编程和UNIX网络编程的学习记录和总结

前言

在线程或进程间共享数据需要进行同步,互斥锁和条件变量是同步的基本组成部分。

首先介绍一下锁:

互斥锁

互斥锁就是相互排斥,有你没我,有我没你。互斥锁用来保护临界区,保证在任何时刻只能有一个进程或线程访问临界区资源。访问前加锁,访问后解锁。

锁的使用

由于共享、竞争而没有加任何同步机制,导致产生于时间有关的错误,会造成数据混乱
使用mutex(互斥量、互斥锁)一般步骤:
​ pthread_mutex_t mutex。 两种取值:0或1
​1. pthread_mutex_t lock(pthread_mutex *mptr); 创建锁
​ 2 pthread_mutex_init; 初始化 1
3. pthread_mutex_trylock(pthread_mutex *mptr); 尝试加锁
​ 4. pthread_mutex_lock;加锁 1-- --> 0
​ 5. 访问共享数据(stdout)
​ 6. pthrad_mutext_unlock();解锁 0++ --> 1
​ 7. pthead_mutex_destroy;销毁锁

如果尝试给一个已经被上锁的锁上锁,那么pthread_mutex_lock将阻塞到该互斥锁解锁为止。

pthread_mutex_lock()是非阻塞函数,若互斥锁已被锁住,返回EBUSY错误

当互斥锁解锁时,可设置优先级调度,同步函数将唤醒优先级最高的被阻塞进程/线程。

int pthread_mutex_init(pthread_mutex_t *restrict mutex, 
const pthread_mutexattr_t *restrict attr)

restrict关键字:
​ 用来限定指针变量。被该关键字限定的指针变量所指向的内存操作,必须由本指针完成。

初始化互斥量:
​ pthread_mutex_t mutex;
​ 1. pthread_mutex_init(&mutex, NULL); 动态初始化。
​ 2. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 静态初始化。

读写锁使用方法相同(lock–>rwlock)

示例:利用多个线程对同意全局数据读写(线程共享进程内资源,方便易用)

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
int counter;
pthread_rwlock_t rwlock;
/* 3 个线程不定时写同一全局资源, 5 个线程不定时读同一全局资源 */
void *th_write(void *arg)
{
	int t, i = (int)arg;
	while (1) {
		pthread_rwlock_wrlock(&rwlock);
		t = counter;
		usleep(1000);
		printf("=======write %d: %lu: counter=%d ++counter=%d\n", i, pthread_self(), t, ++counter);
		pthread_rwlock_unlock(&rwlock);
		usleep(10000);
		}
	return NULL;
}
void *th_read(void *arg)
{
	int i = (int)arg;
	while (1) {
		pthread_rwlock_rdlock(&rwlock);
		printf("----------------------------read %d: %lu: %d\n", i, pthread_self(), counter);
		pthread_rwlock_unlock(&rwlock);
		usleep(2000);
	}
	return NULL;
}
int main(void)
{
	int i;
	pthread_t tid[8];
	pthread_rwlock_init(&rwlock, NULL);
	for (i = 0; i < 3; i++)
		pthread_create(&tid[i], NULL, th_write, (void *)i);
	for (i = 0; i < 5; i++)
		pthread_create(&tid[i+3], NULL, th_read, (void *)i);
	for (i = 0; i < 8; i++)
		pthread_join(tid[i], NULL);
	pthread_rwlock_destroy(&rwlock);
	return 0;
}

条件变量:等待与信号发送

pthread_cond_t cond; 条件变量

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

参 2: attr 表条件变量属性,通常为默认值,传 NULL 即可

​初始化条件变量:
​ 1. pthread_cond_init(&cond, NULL); 动态初始化。
​ 2. pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 静态初始化。

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

函数作用:阻塞等待一个条件变量

  1. 阻塞等待条件变量 cond(参 1)满足
  2. 释放已掌握的互斥锁(解锁互斥量) 相当于 pthread_mutex_unlock(&mutex);
    1.2.两步为一个原子操作。
  3. 当被唤醒, pthread_cond_wait 函数返回时,解除阻塞并重新申请获取互斥锁 pthread_mutex_lock(&mutex)
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec*restrict abstime);
	//参 3: 参看 man sem_timedwait 函数,查看 struct timespec 结构体。
		struct timespec {
			time_t tv_sec; /* seconds */long tv_nsec; /* nanosecondes*/ 纳秒
		}

形参 abstime:绝对时间。time(NULL)返回的就是绝对时间,而 alarm(1)是相对时间。
struct timespec t = {1, 0};
pthread_cond_timedwait (&cond, &mutex, &t); 只能定时到 1970 年 1 月 1 日 00:00:01 秒
正确用法:
time_t cur = time(NULL); 获取当前时间。
struct timespec t; 定义 timespec 结构体变量 t
t.tv_sec = cur+1; 定时 1 秒
pthread_cond_timedwait (&cond, &mutex, &t); 传参

//唤醒至少一个阻塞在条件变量上的线程
int pthread_cond_signal(pthread_cond_t *cond);
//唤醒全部阻塞在条件变量上的线程
int pthread_cond_broadcast(pthread_cond_t *cond);

//销毁一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond);

经典案例–生产者消费者模型

假定有两个线程,一个模拟生产者行为,一个模拟消费者行为。两个线程同时操作一个共享资源,生产者向其中添加产品,消费者从中消费掉产品,利用条件变量阻塞等待条件满足。

逻辑如下:

创建锁—>初始化

生产者:加锁—>放产品—> 解锁 —> 通知阻塞线程 —> 生产产品

消费者:尝试加锁—>阻塞等待条件变量满足—>解锁 —>满足条件变量 —>加锁 —>拿产品 —>解锁

释放条件变量—>释放锁

#include<errno.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>


void err_thread(int ret,char *str)
{
    if(ret!=0){
        fprintf(stderr,"%s:%s\n",str,strerror(ret));
            pthread_exit(NULL);
            }
 }


//链表作为共享数据,互斥保护
struct msg{
    struct msg *next;
    int num;
};

struct msg *head;

//静态初始化一个条件变量和一个互斥量
pthread_cond_t has_product=PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock =PTHREAD_MUTEX_INITIALIZER;

void *consumer()
{
    struct msg *mp;

    for(;;){
        pthread_mutex_lock(&lock);
        while(head==NULL){//一个消费者也可用if(head==NULL)
            pthread_cond_wait(&has_product,&lock);//等待条件满足。解锁
        }
        mp=head;
        head=mp->next;//模拟消费一个产品
        pthread_mutex_unlock(&lock);
        printf("consume %lu---%d\n",pthread_self(),mp->num);
        free(mp);
        sleep(rand()%3);
        }
	//return NULL; //未写时编译也没问题
}

void *producer(void *p)  //这里的*p是空的,与不写等价
{
    while(1){
        struct msg *mp;

        mp=malloc(sizeof(struct msg));
        mp->num= rand()%1000+1;		//模拟生产一个产品
        printf("produce ----------%d\n",mp->num);

        pthread_mutex_lock(&lock);
        mp->next=head;
        head=mp;
        pthread_mutex_unlock(&lock);

        pthread_cond_signal(&has_product); //唤醒该条件变量上的进程
        sleep(rand()%3);
        }
    return NULL;
}



    
int main(int argc ,char *argv[]){

    pthread_t pid,cid;
    srand(time(NULL));
    int ret;
    ret=pthread_create(&pid,NULL,producer,NULL);
    if(ret!=0)
        err_thread(ret,"pthread_create producer error");
    ret=pthread_create(&cid,NULL,consumer,NULL);
    if(ret!=0)
        err_thread(ret,"pthread_create consumer error");

    pthread_join(pid,NULL);  //第二个参数回收线程的退出值
    pthread_join(cid,NULL);  //第二个参数回收线程的退出值


    return 0;
}

条件变量优点:
相较于 mutex 而言,条件变量可以减少竞争。如使用mutex,即使临界区没有资源,消费者之间也会产生竞争互斥量。

信号量

信号量是一种用于提供不同进程间或一个给定进程的不同线程间同步手段的原语
相当于 初始化值为 N 的互斥量。 N值,表示可以同时访问共享数据区的线程数。
在《UNIX网络编程》中记录了3种信号量:

  • Posix有名信号量:使用Posix IPC名字标识,不必再内核中维护,也可能由与文件系统中路劲名对应的名字来标识(但不要求它们真正存放在文件系统内的某个文件中)。
  • Posix基于内存的信号量:存放在共享内存区中。
  • System V信号量:在内核中维护。

信号量的初值,决定了占用信号量的线程的个数。
sem_t sem; 定义类型

初始化一个信号量:

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:
sem: 信号量

pshared: 0: 用于线程间同步
1: 用于进程间同步

value:N值。(指定同时访问的线程数)

int sem_wait(sem_t *sem);

  1. 信号量大于 0,则信号量-- (类比 pthread_mutex_lock)
  2. 信号量等于 0,造成线程阻塞

int sem_post(sem_t *sem);
将信号量++,同时唤醒阻塞在信号量上的线程 (类比 pthread_mutex_unlock)
但,由于 sem_t 的实现对用户隐藏,所以所谓的++、 --操作只能通过函数来实现,而不能直接++、 --符号。

限时尝试对信号量加锁 --:
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
参 2: abs_timeout 采用的是绝对时间。

定时 1 秒:
time_t cur = time(NULL); 获取当前时间。
struct timespect; 定义 timespec 结构体变量 t
t.tv_sec = cur+1; 定时 1 秒
t.tv_nsec = t.tv_sec +100;
sem_timedwait(&sem, &t); 传参

信号量–生产者消费者模型

生产者:
sem_wait(共享区空_num); // >0共享区-1,=0阻塞
生产产品
sem_post(共享区满_num); //产品++

消费者:
sem_wait(共享区满_num); // num–
消费产品
sem_post(共享区空_num);

#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>
#include<semaphore.h>

#define NUM 5

int queue[NUM];             //全局数组实现唤醒队列
sem_t blank_number,product_number;      //

void *producer(void *arg)
{
    int i=0;
    while(1){
        sem_wait(&blank_number);        //--
        queue[i]=rand()%1000+1;         //生产一产品
        printf("-----Produce-----%d\n",queue[i]);
        sem_post(&product_number);      //产品++

        i=(i+1)%NUM;  //借助下标实现环形
        sleep(rand()%1);
        }
}

void *consumer(void *arg)
{
    int i=0;
    while(1){
        sem_wait(&product_number);      //--
        printf("-----Consumer----%d\n",queue[i]);

        sem_post(&blank_number);      //产品++

        i=(i+1)%NUM;  //借助下标实现环形
        sleep(rand()%3);
    }        
}
int main(){

    pthread_t pid,cid;
    sem_init(&blank_number,0,NUM); //0--线程间同步,NUM--同时访问的线程数
    sem_init(&product_number,0,0);

    pthread_create(&pid,NULL,producer,NULL);
    pthread_create(&cid,NULL,consumer,NULL);

    pthread_join(pid,NULL);//回收线程返回值null
    pthread_join(cid,NULL);//回收线程返回值null

    sem_destroy(&blank_number);
    sem_destroy(&product_number);

    return 0;
}
   

版权声明:本文为qq_43460315原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。