并行算法设计 - Pthreads

实验效果不行,想快也快不了,论文写不下去了。只能写篇博客分散一下注意力。另外也是好久没写博客了,对最近网课学的Pthreads进行简要的总结。

参考资料

  1. 维基百科 - POSIX线程
  2. POSIX Threads Programming

Pthreads简介

参照维基百科,Pthreads定义了创建和操纵线程的一套API,一般用于类Linux系统。

Pthreads中的API,大致可以分为4类:

  • 线程管理,例如创建线程,等待(join)线程等
  • 互斥锁(Mutex):创建、摧毁、锁定、解锁、设置属性等操作
  • 条件变量(Condition Variable):创建、摧毁、等待、通知、设置与查询属性等操作
  • 使用了互斥锁的线程间的同步管理

并行程序通用模板

学习一门新的编程语言,第一步就是学习如何输出Hello World。这里学习pthreads,第一件事情,也是写一个并行程序输出Hello Worldhello.c代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <pthread.h> 
//pthread.h中包含一些线程创建,同步等函数。
// 全局变量:线程数量
int thread_count;

void *Hello(void * rank){//定义线程函数 rank用于区分不同的线程
   long my_rank = (long)rank;

   printf("Hello World from thread %ld of %d\n", my_rank, thread_count);

   return NULL;
}
int main(int argc, char* argv[]) {
   long thread; //使用long 用于64位系统
   pthread_t* thread_handles;

   thread_count = atol(argv[1]);//从命令行中读取进程数量
   thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));

   for(thread = 0; thread < thread_count; thread++){
       pthread_create(&thread_handles[thread], NULL, Hello, (void *)thread);
       //pthread_create用于创建线程
   }

   printf("Hello from the main thread\n");

//  加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。
//    线程间同步操作
   for(thread = 0; thread < thread_count; ++thread){
       pthread_join(thread_handles[thread], NULL);
   }
   free(thread_handles);//释放开辟空间,避免内存泄漏
   return 0;
}

创建一个并行程序的一般步骤是:

  1. 定义线程函数,每个线程的具体操作
  2. 创建线程
  3. 同步线程
  4. 销毁线程

这里对代码中用到的几个函数进行简单介绍。

  • pthread_create (thread,attr,start_routine,arg). 初始情况下,main函数包含一个单独的,默认的线程,其它的线程可以通过pthread_create由coder显式的创建。thread是子进程的句柄,唯一的标识符;arr用于设置线程属性,一般为NULL;start_routine创建线程后将执行的线程函数;arg单一参数传给线程函数,一般传的是空指针类型,后再强制转换,不需要传递参数,则为NULL。
  • 线程函数的返回值就是void*,没有其它类型,不是奢求它返回其它值,形参是由pthread_cread的arg参数传过来的
  • pthread_join. int pthread_join(pthread_t thread, void **retval).以阻塞的方式等待thread指定的线程结束,实现线程同步。

下面运行hello.c,执行命令gcc hello.c -o hello -lphread-o用来指定生成可执行文件的名字,-lpthread用来链接ptheard库。
在这里插入图片描述
执行命令后,在目录下生成hello文件,运行hello文件,线程数量设置为4,即./hello 4,运行结果如图所示。

避免访问冲突

估算π \piπ,可以通过公式p i = 4 ( 1 − 1 3 + 1 5 + ⋯ + ( − 1 ) n 1 2 n + 1 + ⋯ ) pi = 4(1 - \frac{1}{3} + \frac{1}{5} + \cdots+(-1)^n\frac{1}{2n + 1} + \cdots)pi=4(131+51++(1)n2n+11+),设计串行的程序很简单,并行程序设计的大体思路是为每个线程分配计算任务,然后将线程计算结果相加,但是相加的时候出现访问冲突,如何避免多个线程同时访问π \piπ的估算值sum呢?这里有忙等待,互斥锁,信号量三种方式来避免访问冲突。

  • 忙等待(busy-waiting)
    estimate_pai_busy_waiting.c代码如下
//
// Created by Onwaier Lee on 2020-03-19.
//

#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <pthread.h>

// 全局变量:线程数量
int thread_count;
int m; //个数
int now_rank = 0;
double res = 0.0;
void *estimate_pai(void * rank){//定义线程函数
    long my_rank = (long)rank;

    int i, j;
    int local_m = m / thread_count;
    int beg = my_rank * local_m;//根据my_rank来分配计算任务
    int end = (my_rank + 1) * local_m - 1;
    double factor = 1.0;
    while(now_rank != my_rank){//忙等待,当my_rank与now_rank相同才计算
    }

    for(i = beg; i <= end; ++i){
        if(i & 1){
            res -= factor / (2 * i + 1);
        }
        else{
            res += factor / (2 * i + 1);
        }
    }
    ++now_rank;
    if (now_rank == thread_count){//最后一个线程计算完后,输出估算结果
        printf("estimate pai:%.9f\n", res * 4);
    }

    return NULL;
}
int main(int argc, char* argv[]) {
    long thread; //使用long 用于64位系统
    pthread_t* thread_handles;

    thread_count = atol(argv[1]);//线程数
    m = atol(argv[2]);//分块个数
    thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));

    for(thread = 0; thread < thread_count; thread++){
        pthread_create(&thread_handles[thread], NULL, estimate_pai, (void *)thread);
    }

//    printf("Hello from the main thread\n");

//  加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。
//    线程间同步操作
    for(thread = 0; thread < thread_count; ++thread){
        pthread_join(thread_handles[thread], NULL);
    }
    free(thread_handles);
    return 0;
}

编译gcc estimate_pai_busy_waiting.c -o estimate_pai_busy_waiting -lpthread, 运行./estimate_pai_busy_waiting 2 1000000,运行结果如图所示。
在这里插入图片描述
对于忙等待,线程函数还可以优化,先定义一个全局变量sum,每个线程可以先计算分配的那部分的和,然后再用忙等待方式,判断是否轮到自己将计算结果加到sum上。

  • 互斥锁(Mutexes)

用忙等待的方式,显然效率不高,这里可以使用互斥锁,在访问之前,判断变量是否加锁,没有,则加锁,然后访问。访问结束后,再解锁。如果已上锁,则等待解锁。
这里的加锁使用的是pthread_mutex_lock(),解锁使用的是pthread_mutex_unlock()

estimate_pai_mutex.c代码如下:

//
// Created by Onwaier Lee on 2020-03-19.
//

//
// Created by Onwaier Lee on 2020-03-19.
//

#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <pthread.h> 

// 全局变量:线程数量
int thread_count;
int m; //个数
double res = 0.0;
pthread_mutex_t mutex ;//定义mutex变量
void *estimate_pai(void * rank){//定义线程函数
    long my_rank = (long)rank;

    int local_m = m / thread_count;
    int beg = my_rank * local_m;
    int end = (my_rank + 1) * local_m - 1;
    double factor = 1.0, tmp_sum = 0.0;
    for(int i = beg; i <= end; ++i){
        if(i & 1){//奇数位 减
            tmp_sum -= factor / (2 * i + 1);
        }
        else{//偶数位  加
            tmp_sum += factor / (2 * i + 1);
        }
    }
    pthread_mutex_lock(&mutex);//占有互斥锁
    res += tmp_sum;
    printf("estimate_pai:%.9f\n", res * 4);
    pthread_mutex_unlock(&mutex);//释放互斥锁


    return NULL;
}
int main(int argc, char* argv[]) {
    long thread; //使用long 用于64位系统
    pthread_t* thread_handles;
    pthread_mutex_init(&mutex,NULL);//初始化mutex
    thread_count = atol(argv[1]);
    m = atol(argv[2]);
    thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));

    for(thread = 0; thread < thread_count; thread++){
        pthread_create(&thread_handles[thread], NULL, estimate_pai, (void *)thread);
    }

//    printf("Hello from the main thread\n");

//  加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。
//    线程间同步操作
    for(thread = 0; thread < thread_count; ++thread){
        pthread_join(thread_handles[thread], NULL);
    }
    pthread_mutex_destroy(&mutex);
    free(thread_handles);
    return 0;
}

运行结果如图所示。
在这里插入图片描述

  • 信号量(semaphore)

互斥锁无法保证执行顺序,忙等待可以,但效率不高。引入了信号量,信号量相关操作函数在semaphore.h头文件 中,需添加。

sem_post(semphore) 释放共享资源,由0变成1
sem_wait(semphore) 申请共享资源,由1变成0

estimate_pai_sem.c 代码如下:


//
// Created by Onwaier Lee on 2020-03-19.
//

#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <pthread.h> 
#include <semaphore.h>

// 全局变量:线程数量
int thread_count;
int m; //个数
double res = 0.0;
pthread_mutex_t mutex ;//定义mutex变量
sem_t binSem;//定义信息号
void *estimate_pai(void * rank){//定义线程函数
    long my_rank = (long)rank;

    int local_m = m / thread_count;
    int beg = my_rank * local_m;
    int end = (my_rank + 1) * local_m - 1;
    double factor = 1.0, tmp_sum = 0.0;
    for(int i = beg; i <= end; ++i){
        if(i & 1){//奇数位 减
            tmp_sum -= factor / (2 * i + 1);
        }
        else{//偶数位  加
            tmp_sum += factor / (2 * i + 1);
        }
    }
    sem_wait(&binSem);//申请共享资源
    res += tmp_sum;
    printf("estimate_pai:%.9f\n", res * 4);
    sem_post(&binSem);//释放共享资源

    return NULL;
}
int main(int argc, char* argv[]) {
    long thread; //使用long 用于64位系统
    pthread_t* thread_handles;
    sem_init(&binSem, 0, 1);//信号量默认初始为1,即可以申请
    thread_count = atol(argv[1]);
    m = atol(argv[2]);
    thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));

    for(thread = 0; thread < thread_count; thread++){
        pthread_create(&thread_handles[thread], NULL, estimate_pai, (void *)thread);
    }

//    printf("Hello from the main thread\n");

//  加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。
//    线程间同步操作
    for(thread = 0; thread < thread_count; ++thread){
        pthread_join(thread_handles[thread], NULL);
    }
    pthread_mutex_destroy(&mutex);
    free(thread_handles);
    return 0;
}

编译运行如图所示。
在这里插入图片描述

同步障(barriers)

如果每个线程在完成分配给自己的任务后, 需要先将自己的计算结果打印出来,等所有线程都打印完自己的计算结果后,再继续将计算结果加到sum。上述问题涉及到同步的问题,即所有线程运行同一个点时,再往下继续执行。实现这个,需要用到同步障,同步障常见的实现的方式也有3种:忙等待+信号量,信号量和条件变量。

  • 忙等待+mutex

实现需要定义一个计数器counter,来判断是否所有线程都输出部分和。而counter作为临界资源需要互斥访问,加上mutex来实现。

总的来说,需要额外的两个变量countermutex,其中counter不可重复使用(不能很好的初始化),而mutex可以。
estimate_pai_sem_barrier1.c代码如下:

//忙等待+mutex实现同步障
//
// Created by Onwaier Lee on 2020-03-19.
//

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

// 全局变量:线程数量
int thread_count;
int m; //个数
double res = 0.0;
pthread_mutex_t mutex ;//定义mutex变量
long counter;//用于barrier中线程的计数
sem_t binSem;//定义信息号
void *estimate_pai(void * rank){//定义线程函数
    long my_rank = (long)rank;

    int local_m = m / thread_count;
    int beg = my_rank * local_m;
    int end = (my_rank + 1) * local_m - 1;
    double factor = 1.0, tmp_sum = 0.0;
    for(int i = beg; i <= end; ++i){
        if(i & 1){//奇数位 减
            tmp_sum -= factor / (2 * i + 1);
        }
        else{//偶数位  加
            tmp_sum += factor / (2 * i + 1);
        }
    }

    pthread_mutex_lock(&mutex); //上锁
    printf("thread %lld, sum:%.15f\n", my_rank, tmp_sum);
    ++counter; // 计数器+1
    pthread_mutex_unlock(&mutex);//解锁

    while(counter != thread_count); // 忙等待

    sem_wait(&binSem);//申请共享资源 1->0
    res += tmp_sum;
    printf("estimate_pai:%.9f\n", res * 4);
    sem_post(&binSem);//释放共享资源 0->1

    return NULL;
}
int main(int argc, char* argv[]) {
    long thread; //使用long 用于64位系统
    pthread_t* thread_handles;
    sem_init(&binSem, 0, 1);//信号量默认初始为1,即可以申请
    pthread_mutex_init(&mutex, NULL);
    thread_count = atol(argv[1]);
    m = atol(argv[2]);
    thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));

    for(thread = 0; thread < thread_count; thread++){
        pthread_create(&thread_handles[thread], NULL, estimate_pai, (void *)thread);
    }

//    printf("Hello from the main thread\n");

//  加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。
//    线程间同步操作
    for(thread = 0; thread < thread_count; ++thread){
        pthread_join(thread_handles[thread], NULL);
    }
    pthread_mutex_destroy(&mutex);
    free(thread_handles);
    return 0;
}

运行结果
在这里插入图片描述

  • 信号量

实现除了用到计数器counter,还用到2个信号量count_sembarrier_sem,其中count_sem可以重复使用,barrier_sem不可以重复使用,可能会导致部分进程无法通过barrier。

//
// Created by Onwaier Lee on 2020-03-19.
//

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

// 全局变量:线程数量
int thread_count;
int m; //个数
double res = 0.0;
sem_t binSem;//定义信息号
long counter;//用于barrier中线程的计数
sem_t count_sem;//cout信号量 初始化为1
sem_t barrier_sem;// barrieri信号量 初始化为0

void *estimate_pai(void * rank){//定义线程函数
    long my_rank = (long)rank;

    int local_m = m / thread_count;
    int beg = my_rank * local_m;
    int end = (my_rank + 1) * local_m - 1;
    double factor = 1.0, tmp_sum = 0.0;
    for(int i = beg; i <= end; ++i){
        if(i & 1){//奇数位 减
            tmp_sum -= factor / (2 * i + 1);
        }
        else{//偶数位  加
            tmp_sum += factor / (2 * i + 1);
        }
    }

    sem_wait(&count_sem);
    if(counter == thread_count - 1) {
        counter = 0;//初始化 counter可重复使用
        printf("thread %lld, sum:%.15f\n", my_rank, tmp_sum);
        sem_post(&count_sem);
        for (int j = 0; j < thread_count - 1; ++j) {//释放barrier_sem资源 thread_count-1次  相当于放行n-1次
            sem_post(&barrier_sem);
        }
    }
    else{
        counter++;
        printf("thread %lld, sum:%.15f\n", my_rank, tmp_sum);
        sem_post(&count_sem);
        sem_wait(&barrier_sem);
    }

    sem_wait(&binSem);//申请共享资源 1->0
    res += tmp_sum;
    printf("estimate_pai:%.9f\n", res * 4);
    sem_post(&binSem);//释放共享资源 0->1

    return NULL;
}
int main(int argc, char* argv[]) {
    long thread; //使用long 用于64位系统
    pthread_t* thread_handles;
    sem_init(&binSem, 0, 1);//信号量默认初始为1,即可以申请
    thread_count = atol(argv[1]);
    m = atol(argv[2]);

    counter = 0;
    sem_init(&count_sem, 0, 1);
    sem_init(&barrier_sem, 0, 0);

    thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));

    for(thread = 0; thread < thread_count; thread++){
        pthread_create(&thread_handles[thread], NULL, estimate_pai, (void *)thread);
    }

//    printf("Hello from the main thread\n");

//  加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。
//    线程间同步操作
    for(thread = 0; thread < thread_count; ++thread) {
        pthread_join(thread_handles[thread], NULL);
    }
    free(thread_handles);
    return 0;
}

执行结果如图所示。
在这里插入图片描述

  • 条件变量

常用函数

pthread_create():创建一个线程
pthread_exit():终止当前线程
pthread_cancel():请求中断另外一个线程的运行。被请求中断的线程会继续运行,直至到达某个取消点(Cancellation Point)。取消点是线程检查是否被取消并按照请求进行动作的一个位置。POSIX 的取消类型(Cancellation Type)有两种,一种是延迟取消(PTHREAD_CANCEL_DEFERRED),这是系统默认的取消类型,即在线程到达取消点之前,不会出现真正的取消;另外一种是异步取消(PHREAD_CANCEL_ASYNCHRONOUS),使用异步取消时,线程可以在任意时间取消。系统调用的取消点实际上是函数中取消类型被修改为异步取消至修改回延迟取消的时间段。几乎可以使线程挂起的库函数都会响应CANCEL信号,终止线程,包括sleep、delay等延时函数。
pthread_join():阻塞当前的线程,直到另外一个线程运行结束
pthread_kill():向指定ID的线程发送一个信号,如果线程不处理该信号,则按照信号默认的行为作用于整个进程。信号值0为保留信号,作用是根据函数的返回值判断线程是不是还活着。
pthread_cleanup_push():线程可以安排异常退出时需要调用的函数,这样的函数称为线程清理程序,线程可以建立多个清理程序。线程清理程序的入口地址使用栈保存,实行先进后处理原则。由pthread_cancel或pthread_exit引起的线程结束,会次序执行由pthread_cleanup_push压入的函数。线程函数执行return语句返回不会引起线程清理程序被执行。
pthread_cleanup_pop():以非0参数调用时,引起当前被弹出的线程清理程序执行。
pthread_setcancelstate():允许或禁止取消另外一个线程的运行。
pthread_setcanceltype():设置线程的取消类型为延迟取消或异步取消。
线程属性函数
pthread_attr_init():初始化线程属性变量。运行后,pthread_attr_t结构所包含的内容是操作系统支持的线程的所有属性的默认值。
pthread_attr_setdetachstate():设置线程属性变量的detachstate属性(决定线程在终止时是否可以被joinable)
pthread_attr_getdetachstate():获取脱离状态的属性
pthread_attr_setscope():设置线程属性变量的__scope属性
pthread_attr_setschedparam():设置线程属性变量的schedparam属性,即调用的优先级。
pthread_attr_getschedparam():获取线程属性变量的schedparam属性,即调用的优先级。
pthread_attr_destroy():删除线程的属性,用无效值覆盖
mutex函数:
pthread_mutex_init() 初始化互斥锁
pthread_mutex_destroy() 删除互斥锁
pthread_mutex_lock():占有互斥锁(阻塞操作)
pthread_mutex_trylock():试图占有互斥锁(不阻塞操作)。即,当互斥锁空闲时,将占有该锁;否则,立即返回。
pthread_mutex_unlock(): 释放互斥锁
pthread_mutexattr_(): 互斥锁属性相关的函数
条件变量函数
pthread_cond_init():初始化条件变量
pthread_cond_destroy():销毁条件变量
pthread_cond_signal(): 发送一个信号给正在当前条件变量的线程队列中处于阻塞等待状态的线程,使其脱离阻塞状态,唤醒后继续执行。如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。一般只给一个阻塞状态的线程发信号。假如有多个线程正在阻塞等待当前条件变量,则根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但pthread_cond_signal在多处理器上可能同时唤醒多个线程,当只能让一个被唤醒的线程处理某个任务时,其它被唤醒的线程就需要继续wait。POSIX规范要求pthread_cond_signal至少唤醒一个pthread_cond_wait上的线程,有些实现为了简便,在单处理器上也会唤醒多个线程。所以最好对pthread_cond_wait()使用while循环对条件变量是否满足做条件判断。
pthread_cond_wait(): 等待条件变量的特殊条件发生;pthread_cond_wait() 必须与一个pthread_mutex配套使用。该函数调用实际上依次做了3件事:对当前pthread_mutex解锁、把当前线程挂起到当前条件变量的线程队列、被其它线程的信号唤醒后对当前pthread_mutex申请加锁。如果线程收到一个信号被唤醒,将被配套的互斥锁重新锁住,pthread_cond_wait() 函数将不返回直到线程获得配套的互斥锁。需要注意的是,一个条件变量不应该与多个互斥锁配套使用。
pthread_cond_broadcast(): 某些应用,如线程池,pthread_cond_broadcast唤醒全部线程,但我们通常只需要一部分线程去做执行任务,所以其它的线程需要继续wait.
pthread_condattr_(): 条件变量属性相关的函数
线程私有存储(Thread-local storage):
pthread_key_create(): 分配用于标识进程中线程特定数据的pthread_key_t类型的键
pthread_key_delete(): 销毁现有线程特定数据键
pthread_setspecific(): 为指定线程的特定数据键设置绑定的值
pthread_getspecific(): 获取调用线程的键绑定值,并将该绑定存储在 value 指向的位置中
同步屏障函数
pthread_barrier_init(): 同步屏障初始化
pthread_barrier_wait():
pthread_barrier_destory():
其它多线程同步函数:
pthread_rwlock_*(): 读写锁
工具函数
pthread_equal(): 对两个线程的线程标识号进行比较
pthread_detach(): 分离线程
pthread_self(): 查询线程自身线程标识号
pthread_once(): 某些需要仅执行一次的函数。其中第一个参数为pthread_once_t类型,是内部实现的互斥锁,保证在程序全局仅执行一次。
信号量函数,包含在semaphore.h中:
sem_open:创建或者打开已有的命名信号量。可分为二值信号量与计数信号量。命名信号量可以在进程间共享使用。
sem_close:关闭一个信号灯,但没有将它从系统中删除。命名信号灯是随内核持续的,即使当前没有进程打开着某个信号灯,它的值仍然保持。
sem_unlink:从系统中删除信号灯。
sem_getvalue:返回所指定信号灯的当前值。如果该信号灯当前已上锁,那么返回值或为0,或为某个负数,其绝对值就是等待该信号灯解锁的线程数。
sem_wait:申请共享资源,所指定信号灯的值如果大于0,那就将它减1并立即返回,就可以使用申请来的共享资源了。如果该值等于0,调用线程就被进入睡眠状态,直到该值变为大于0,这时再将它减1,函数随后返回。sem_wait操作必须是原子操作。
sem_trywait:申请共享资源,当所指定信号灯的值已经是0时,后者并不将调用线程投入睡眠。相反,它返回一个EAGAIN错误。
sem_post:释放共享资源。与sem_wait恰相反。
sem_init:初始化非命名(内存)信号量
sem_destroy:摧毁非命名信号量
共享内存函数,包含在sys/mman.h中,链接时使用rt库:
mmap:把一个文件或一个POSIX共享内存区对象映射到调用进程的地址空间。使用该函数的目的: 1.使用普通文件以提供内存映射I/O 2.使用特殊文件以提供匿名内存映射。 3.使用shm_open以提供无亲缘关系进程间的Posix共享内存区。
munmap: 删除一个映射关系
msync:文件与内存同步函数
shm_open:创建或打开共享内存区
shm_unlink:删除一个共享内存区对象的名字,删除一个名字仅仅防止后续的open,msq_open或sem_open调用取得成功。
ftruncate:调整文件或共享内存区大小
fstat来获取有关该对象的信息


版权声明:本文为happyeveryday62原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。