并行程序设计整理(三)—— Pthreads

共享内存程序 VS 分布式内存程序

共享内存程序

在共享内存程序中,变量可以是共享的,也可以是私有的。

共享变量可以被任何线程读写,私有变量通常只能被一个线程访问。线程之间的通信通常通过共享变量完成,因此此种方式下通信是隐式的。

动态线程

在这个范例中,通常有一个主线程,并且在任何给定时刻都有一个(可能是空的)工作线程集合。

主线程通常等待工作请求,当一个新的请求到达时,它会fork一个工作线程,该线程执行请求,当该线程完成工作时,它终止并加入主线程。

这种范例有效地利用了系统资源,因为线程所需要的资源只在线程实际运行时才会被使用。

静态线程

在这个范例中,在主线程进行任何必要的设置之后会fork出所有的线程,这些线程一直运行直到所有的工作完成。在线程加入主线程后,主线程可能会做一些清理(例如,释放内存),这样被fork出来的线程也会终止。

在资源使用方面效率较低,如果一个线程是空闲的,它的资源(例如,堆栈,程序计数器,等等)都不能被释放。

优点是,它更接近于最广泛使用的分布式内存编程范例。

线程安全

如果一个函数或库在被多个同时执行的线程调用时能够“正确”操作,那么它就是线程安全的。

由于多个线程通过共享内存进行通信和协调,因此线程安全代码会使用适当的同步来修改共享内存的状态。

Pthreads

POSIX: P ortable O perating S ystem I nterface for UNI X,是UNIX的可移植操作系统接口,提供操作系统实用程序的接口。

PThreads: POSIX线程接口。系统调用来创建和同步线程

PThreads支持创建并行性、同步,但不支持显式通信,因为共享内存是隐式的。

创建线程

pthread_create函数用来启动线程,其签名为:

int pthread_create(
    pthread_t* 				thread_p 				/* out */,
    const pthread_attr_t* 	attr_p 					/* in */,
    void* 					(*start_routine)(void*) /* in */,
    void* 					arg_p 					/* in */);

参数列表:

thread_p:指向某一个pthread_t对象。*⚠️注意:对象不是通过调用pthread create来分配的;它必须在调用之前分配。

attr_p:不会使用这个参数,函数调用中传递参数NULL

(*start_routine)(void*):线程要运行的函数。

arg_p:一个指针,指向应该传递给函数start_routine的参数。

返回值:大多数Pthreads函数的返回值表示函数调用中是否有错误。在这个函数中,如果创建操作失败,返回值将被设置为非零(nonzero)

终止线程

主线程为某一线程调用pthread_join函数来完成终止,函数签名为:

int pthread_join(
    pthread_t 	thread 		/* in */,
    void** 		ret_val_p 	/* out */);

thread:代表了一个pthread_t对象。

ret_val_p:接收由线程计算的任何返回值。

如果将主线程视为一条线,如下图所示,那么,当调用pthread_create时,主线程上会创建一个分支或分叉。对pthread_create的多次调用将导致多个分支或分叉。然后,当由pthread_create启动的线程终止时,这些分支会并入主线程。

在这里插入图片描述

一个例子

int main() {
    pthread_t threads[16];
    int tn;
    for(tn = 0; tn < 16; tn++) {
        pthread_create(&threads[tn], NULL, ParFun, NULL);
    }
    for(tn = 0; tn < 16; tn++) {
        pthread_join(threads[tn], NULL);
    }
    return 0;
}

在上面这段代码中,一共创建了16 1616个线程来执行函数ParFun

*⚠️注意:线程创建的成本很高,因此ParFun并行地做大量工作以分摊这些成本是很重要的。

关于共享数据的问题:

  • main函数外声明的变量是共享的
  • 在堆上分配的对象可以被共享(如需要共享可以通过指针传递)
  • 堆栈上的变量是私有的:将指向这些变量的指针传递给其他线程可能会导致问题。

Hello World

书上实例:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

/* Global variable: accessible to all threads */
int thread_count;

void* Hello(void* rank); /* Thread function */
    
int main(int argc, char* argv[]) {
    long thread; /* Use long in case of a 64−bit system */
    pthread_t* thread_handles;
    
    /* Get number of threads from command line */
    thread_count = strtol(argv[1], NULL, 10);
    
    thread_handles = malloc (thread count*sizeof(pthread t));
    
    for (thread = 0; thread < thread_count; thread++)
    	pthread_create(&thread handles[thread], NULL, Hello, (void*) thread);
    
    printf("Hello from the main thread\n");
    
    for (thread = 0; thread < thread_count; thread++)
    	pthread_join(thread_handles[thread], NULL);
   
    free(thread_handles);
    return 0;
} /* main */
void* Hello(void* rank) {
    long my_rank = (long) rank;
    /* Use long in case of 64−bit system */
        
    printf("Hello from thread %ld of %dn\n", my_rank, thread_count);
    return NULL;
} /* Hello */

矩阵向量乘法

A = ( a i j ) A = (a_{ij})A=(aij)是一个m × n m\times nm×n的矩阵,x = ( x 0 , x 1 , ⋯ , x n − 1 ) T \textbf{x} = (x_0,x_1,\cdots,x_n-1)^Tx=(x0,x1,,xn1)T是一个n nn维行向量,那么A x = y A\textbf{x}=\textbf{y}Ax=y是一个m mm维列向量。计算示意如下:

在这里插入图片描述

即对于y = ( y 0 , y 1 , ⋯ , y m − 1 ) \textbf{y} = (y_0,y_1,\cdots,y_m-1)y=(y0,y1,,ym1)来说,y i = ∑ k = 0 n − 1 a i k x k y_i = \sum_{k=0}^{n-1}a_{ik}x_kyi=k=0n1aikxk

实现上述矩阵乘法的串行程序的伪代码如下:

/* For each row of A */
for (i = 0; i < m; i++) {
    y[i] = 0.0;
    /* For each element of the row and each element of x */
    for (j = 0; j < n; j++)
    	y[i] += A[i][j] * x[j];
}

对于上述操作要实现并行化,一种做法是在线程间分配外循环的迭代任务,也就是让每个线程计算一部分y的值。

m = n = 6 m=n=6m=n=6,同时假设有3 33个线程。那么分配如下:

在这里插入图片描述

此时对于线程 i ii来说,1 ≤ i ≤ thread_num , i ∈ Z ) 1\leq i\leq \texttt{thread\_num}, i\in \mathbb{Z})1ithread_num,iZ),计算y[i]的代码如下:

y[i] = 0.0;
for (j = 0; j < n; j++)
	y[i] += A[i][j] * x[j];

在这段程序中,x是被Ay共享的数据。

那么,并行实现矩阵向量乘法的代码如下:

void* Pth_mat_vect(void* rank) {
    long my_rank = (long) rank;
    int i, j;
    int local_m = m / thread_count;
    int my_first_row = my_rank * local_m;
    int my_last_row = (my_rank + 1) * local_m - 1;
    
    for(i = my_first_row; i<= my_last_row; i++) {
        y[i] = 0.0;
        for(j = 0; j < n; j++) 
            y[i] = A[i][j] * x[j];
    }
    return NULL;
} /* Pth_mat_vect */

临界区——估算π \piπ的值

在矩阵向量乘法这个问题中,并行实现的程序以非常理想的方式访问共享内存位置,但实际情况下会遇到更加麻烦的问题,参考如下例子:通过公式
π = 4 ( 1 − 1 3 + 1 5 − 1 7 + ⋯ + ( − 1 ) n 1 2 n + 1 ) \pi = 4(1-\dfrac{1}{3}+\dfrac{1}{5}-\dfrac{1}{7}+\cdots+(-1)^n\dfrac{1}{2n+1})π=4(131+5171++(1)n2n+11)
来估算π \piπ的值。

求解这个问题的串行伪代码如下:

double factor = 1.0;
double sum = 0.0;
for (i = 0; i < n; i++, factor = −factor) {
	sum += factor / (2 * i + 1);
}
pi = 4.0 * sum;

和求解矩阵向量乘法时的思路一样,可以将for循环分隔开,分配给不同的线程,这样sum就成为了一个共享变量。依照这个思路,实现代码如下:

void* Thread_sum(void* rank) {
    long my_rank = (long) rank;
    double factor;
    long long i;
    long long my_n = n/thread_count;
    long long my_first_i = my_n*my_rank;
    long long my_last_i = my_first_i + my_n;
    if (my_first_i % 2 == 0) /* my_first_i is even */
    	factor = 1.0;
    else /* my first i is odd */
    	factor =1.0;
    for (i = my_first_i; i < my_last_i; i++, factor = −factor) {
    	sum += factor/(2*i+1);
    }
    return NULL;
} /* Thread_sum */

上述代码看起来实现了并行,但是并不正确。把问题化简,假设并行执行如下代码:

y = Compute(my_rank);
x = x + y;

在这里插入图片描述

根据给定的条件可以计算出,预期的正确答案应为3 33,但程序并不一定能给出这个正确答案,比如上图中,最终得到的答案就是2 22

可见,当多个线程尝试更新共享资源时,结果可能无法预测。当执行的结果取决于两个或多个事件的先后顺序时,就存在竞态条件(race condition)。临界区(critical section)就是一个代码块,用于更新一次只能由一个线程更新的共享资源。(x = x + y;就是一个临界区。)

为了避免发生混乱,可以使用flag变量来维护临界区。

y = Compute(my_rank);
While(flag != my_rank);
x = x + y;
flag++;

一旦加上了这两行代码,在线程i ii执行完临界区的代码之前,线程i + 1 i+1i+1就无法访问临界区。

忙等

flag变量维护了临界区的同时,while循环语句带来了忙等(busy-waiting)的问题。

在忙等的过程中,线程会反复测试一个条件,直到该条件成立才会进行下一步操作。但要注意的是,使用忙等时,应该关闭编译器优化,否则编译器可能会做出一些改变,从而影响忙碌等待的正确性。一旦优化器将代码调整成了如下顺序:

y = Compute(my_rank);
x = x + y;
While(flag != my_rank);
flag++;

对于单线程运行的程序来说,其运行结果与未交换顺序的代码运行得到的结果相同。但是改成多线程运行,那么临界区将不在起作用,运行结果将无法预测。

对于估算π \piπ的值这个问题,加上忙等后的核心代码如下:

for (i = my_first_i; i < my_last_i; i++, factor = −factor) {
    While(flag != my_rank);
    sum += factor/(2*i+1);
    flag = (flag + 1) % thread_count;
}

虽然这么做可以避免运行结果出错,但其在很大程度上牺牲了性能。事实上,忙等并不是保护临界区的唯一方案。要想保证性能,就应该尽量减少执行临界段代码的次数。因此,提高sum函数性能的一种方法是让每个线程使用一个私有变量来存储其对sum的总贡献。同时通过这种方式来保护临界区。此时核心代码改成:

for(i = my_first_i; i < my_last_i; i++,factor  = -factor) {
    my_sum+=factor/(2*i+1);
} 
while(flag != my_rank):
sum += my_sum;
flag=(flag+1)% thread_count;

Mutexes

由于忙等的线程可能会持续使用CPU,因此忙等通常不是限制访问临界区问题的理想解决方案。更好的方案是利用mutex和semaphore。(互斥量和信号量)

Mutex(互斥锁)可以保证一个线程在执行临界区时“排除”所有其他线程,即斥保证了对临界区的互斥访问。

Pthreads标准包括一个特殊类型的互斥锁:pthread_mutex_t

  • 在使用pthread_mutex_t类型的变量之前先要调用如下函数进行初始化:

    int pthread_mutex_init(
        pthread_mutex_t* 			mutex_p /* out */,
        const pthread_mutexattr_t* 	attr_p 	/* in */);
    

    其中第二个参数不会被使用,所以只传递NULL即可。

  • 当一个Pthreads程序使用完互斥锁时,应调用:

    int pthread_mutex_destroy(pthread_mutex_t* mutex_p /* in/out */);   
    
  • 要访问临界区,线程可以调用:

    int pthread_mutex_lock(pthread_mutex_t* mutex_p /* in/out */);
    
  • 当线程在临界区中执行完代码后,应调用:

    int pthread_mutex_unlock(pthread_mutex_t* mutex_p /* in/out */);   
    

使用mutex保护临界区,估算π \piπ的值这一问题中,核心代码为:

pthread_mutex_lock(&mutex);
sum += my_sum;
pthread_mutex_unlock(&mutex);

临界区中线程执行代码的顺序或多或少是随机的。

对比使用忙等和使用互斥量的性能,结果如下:

在这里插入图片描述

  • 当程序运行的线程少于内核时,总体运行时的差别并不大。
  • 增加线程数量使之超过内核数量,使用互斥的版本的性能几乎保持不变,而忙等版本的性能会下降。
  • 使用忙等时,如果线程多于内核,性能会下降。

生产者-消费者同步和信号量

生产者-消费者同步是指一个线程只有在另一个线程采取了某些操作后才能继续执行。

Semaphore(信号量)是避免冲突访问临界区的第三种方法。

信号量可以被认为是一种特殊类型的无符号整型,所以它们的值可以是0 , 1 , 2 , ⋯ 0,1,2,\cdots0,1,2,。大多数情况下,只取值为0和1时,这样的信号量称为二进制信号量0 00对应一个锁定的互斥锁,而1 11对应一个未锁定的互斥锁。要使用二进制信号量作为互斥量,需要将其初始化为…1 11,也就是“解锁的”。对应两种操作:sem_waitsem_post

各种信号量函数的语法为:

在这里插入图片描述

注意,semaphore并不来自Pthreads,要使用它需要头文件#include <semaphore.h>

Barriers

同步线程以确保它们都在程序中的同一点,这个称为barrier。在所有线程都到达该屏障之前,没有线程可以跨越该barrier。

Barrier多应用,其中之一是可以确保所有线程在同一时刻执行同一段代码,还有一个非常重要的用途就是调试。在并行程序中很难确定错误发生在哪里,此时可以借助barrier:

point in program we want to reach;
barrier;
if (my_rank == 0) {
    printf("All threads reached this point\n");
    fflush(stdout);
}

要(动态地)初始化一个barrier,使用如下的代码(这里将线程数设置为3):

pthread_barrier_t b;
pthread_barrier_init(&b,NULL,3);

pthread_barrier_init中第二个参数指向一个对象属性;使用NULL将生成默认属性。

要在barrier处等待,进程执行:

pthread_barrier_wait(&b);

Barrier也可以通过分配一个通过宏创建的初始值来静态初始化。

PTHREAD_BARRIER_INITIALIZER(3)

由于许多Pthreads的实现不提供barrier,因此如果代码要能够移植,需要自己的实现。接下来是三种实现方式。

忙等+互斥量

使用忙等和互斥量实现barrier的思路如下:维护一个由互斥量保护的共享计数器。当计数器指示每个线程已经进入临界区时,线程可以离开一个忙等循环。实现的伪代码如下:

/* Shared and initialized by the main thread */
int counter; /* Initialize to 0 */
int thread_count;
pthread_mutex_t barrier_mutex;

void Thread_work(...) {
    /* Barrier */;
    pthread_mutex_lock(&barrier_mutex);
    counter++;
    pthread_mutex_unlock(&barrier_mutex);
    while(counter < thread_count);
}

使用这种方法实现barrier,会遇到忙等面临的问题,即在很大程度上牺牲了性能。

同时,还有一个问题在于共享变量counter。在上述代码中,如果想实现第二个barrier并尝试重用计数器counter

int counter; /* Initialize to 0 */
int thread_count;
pthread_mutex_t barrier_mutex;

void Thread_work(...) {
    /* Barrier1 */;
    pthread_mutex_lock(&barrier_mutex1);
    counter++;
    pthread_mutex_unlock(&barrier_mutex1);
    while(counter < thread_count);
    
     /* Barrier2 */;
    pthread_mutex_lock(&barrier_mutex2);
    counter++;
    pthread_mutex_unlock(&barrier_mutex2);
    while(counter < thread_count);
}

当第一个barrier完成时,counter的值将变成thread_count。若不重置计数器,那么在第二个barrier中,while条件counter < thread_count将为false,这样barrier就不会导致线程阻塞。此外,安全地将计数器重置为零的几乎不可能。因此,每个barrier都要有一个自己的计数器变量(如果有需要的话)。

信号量

/* Shared variables */
int counter; /* Initialize to 0 */
sem_t count_sem; /* Initialize to 1 */
sem_t barrier_sem; /* Initialize to 0 */

void* Thread_work(...) {
    /* Barrier */
    sem_wait(&count_sem);
    if(counter == thread_count - 1) { // 此时进入的是最后一个线程
        counter = 0;
        sem_post(&count_sem);
        for(j = 0; j < thread_count - 1; j++) {
            sem_post(&barrier_sem);
        }
    } else {
        counter++;
        sem_post(&count_sem); // count_sem++;
        sem_wait(&barrier_sem); // barrier_sem--; 由于semaphore是无符号整型,barrier_sem一旦小于零,线程将会阻塞。
	}
}

这种情况下,计数器counter可以重用,因为在释放barrier中的任何线程之前,都小心地重置了它。另外,因为在任何线程离开barrier之前,count_sem都被重置为1 11,所以它可以重用。

条件变量

条件变量是一个数据对象,它允许线程挂起执行,直到发生某个事件或条件。当事件或条件发生时,另一个线程可以向该线程发出“唤醒”信号。条件变量总是与互斥对象相关联。

其使用的伪代码如下:

lock mutex;
if condition has occurred
	signal thread(s);
else {
    unlock the mutex and block;
    /* when thread is unblocked, mutex is relocked */
}
unlock mutex;

Pthreads中的条件变量的类型为pthread_cond_t

unblock一个被阻塞的线程的函数如下:

int pthread_cond_signal(pthread_cond_t* cond_var_p /* in/out */);

unblock所有被阻塞的线程的函数如下:

int pthread_cond_broadcast(pthread_cond_t* cond_var_p /* in/out */);

对mutex_p引用的互斥锁进行解锁,导致执行线程阻塞的函数:

int pthread_cond_wait(
    pthread_cond_t* cond_var_p /* in/out */,
    pthread_mutex_t* mutex_p /* in/out */);

下面的代码实现了一个带有条件变量的barrier:

/* Shared */
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
. . .
void* Thread work(. . .) {
    . . .
    /* Barrier */
    pthread_mutex_lock(&mutex);
    counter++;
    if (counter == thread_count) {
        counter = 0;
        pthread_cond_broadcast(&cond_var);
    } else {
	    while (pthread_cond_wait(&cond_var, &mutex) != 0);
	}
    pthread_mutex_unlock(&mutex);
    . . .
}

读写锁

假设共享数据结构是一个排好序的int型链表,相关的操作有成Member, InsertDelete

结构体定义如下:

struct list_node_s {
    int data;
    struct list_node_s* next;
}

假设目前链表存储的数据如下:在这里插入图片描述

关于三种操作,其串行执行的方式如下:

  1. 查询操作:

    即遍历一遍链表,查询是否有满足条件的值。

    int Member(int value, struct list_node_s* head_p) {
        struct list_node_s* curr_p = head p;
        while (curr_p != NULL && curr_p−>data < value)
        curr_p = curr_p−>next;
        if (curr_p == NULL || curr_p−>data > value) {
            return 0;
        } else {
        	return 1;
        }
    } /* Member */
    
  2. 插入操作的思路:在这里插入图片描述

  3. 删除操作的思路:在这里插入图片描述

一个多线程的链表

现在需要在一个Pthreads程序中实现上述三个函数(查询、插入、删除)。为了共享对列表的访问,可以将head_p定义为一个全局变量。

多线程同时执行Member,InsertDelete这三个函数,可能会带来问题。假设如下场景:

Thread 0 执行 Member(5)
Thread 1 执行 Delete(5)

这两个线程同时执行的结果是不可预知的,其示意图如下:

在这里插入图片描述

  1. 有可能thread 0在thread 1删除5 55之前先读到了该值,并返回了结果。
  2. 还有可能,如果thread 0正在执行Member(8),thread 1可能在thread 0访问8 88所在的内存地址之前先将存放5 55的内存空间释放了。可能会导致thread 0的访问段异常。

对于个问题,一种解决方案是:在任何线程试图访问该列表时锁定该列表。例如添加如下代码:

Pthread_mutex_lock(&list_mutex);
Member(value);
Pthread_mutex_unlock(&list_mutex);

如果代码中绝大多数操作都是对Member的调用,这种方式将使得代码无法实现并行性。另一方面,如果大多数操作是插入和删除调用,那么这可能是最好的解决方案。

这种方案的另一种实现是增加“细粒度”锁定。我们可以尝试锁定单个节点,而不是锁定整个列表。例如,可以在列表节点结构中添加一个互斥量:

struct list_node_s {
    int data;
    struct list_node_s* next;
    pthread_mutex_t mutex;
}

这种实现比原来的成员函数复杂得多。总的来说,它的速度也要慢得多,因为每次访问一个节点时,都必须锁定和解锁互斥锁。此外,向每个节点添加一个互斥字段将大大增加列表所需的存储空间。

Pthreads read-write locks

Pthreads的读写锁提供了另一种选择。

读写锁有点像互斥锁,只是它提供了两个锁函数。第一个锁用于读,第二个锁用于写。

多个线程可以通过调用read-lock函数同时获得锁,而只有一个线程可以通过调用write-lock函数获得锁。因此,如果一个线程拥有用于读取的锁,那么任何希望获得用于写入的锁的线程都将在对write-lock函数的调用中阻塞。此外,如果一个线程拥有用于写入的锁,那么任何希望获得用于读写的锁的线程都将在各自的锁函数中阻塞。

代码示例:

pthread_rwlock_rdlock(&rwlock);
Member(value);
pthread_rwlock_unlock(&rwlock);
. . .
pthread_rwlock_wrlock(&rwlock);
Insert(value);
pthread_rwlock_unlock(&rwlock);
. . .
pthread_rwlock_wrlock(&rwlock);
Delete(value);
pthread_rwlock_unlock(&rwlock);

不同实现方式性能上的对比

在这里插入图片描述


//以上内容整理自《Introduction to Parallel Programming》一书


版权声明:本文为Aresix原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。