Pthreads实现任务队列

编写一个Pthreads程序实现一个“任务队列”。主线程启动用户指定数量的线程,这些线程进入条件等待状态。主线程生成一些任务(一定计算量),每生成一个新的任务,就用条件变量唤醒一个线程,当这个唤醒线程执行完任务时,回到条件等待状态。当主线程生成完所有任务,设置全局变量表示再没有要生成的任务了,并用一个广播唤醒所有线程。为了清晰起见,建议任务采用链表操作。

相关阅读

OpenMP实现求矩阵均值最大值以及最小值
OpenMP实现数据统计
Pthreads实现任务队列
Pthreads实现梯形积分
visual studio 2019配置Pthreads和OpenMP
CodeBlocks 17.12配置Pthreads和OpenMP

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <windows.h>
typedef struct QueueNode {
    int id;
    struct QueueNode* next;
}QueueNode;
typedef struct TaskQueue {
    QueueNode* front;
    QueueNode* rear;
}TaskQueue;
int InitQueue(TaskQueue* Qp) {
    Qp->rear = Qp->front = (QueueNode*)malloc(sizeof(QueueNode));
    Qp->front->id = 2018;
    Qp->front->next = NULL;
    return 1;
}
int EnQueue(TaskQueue* Qp, int e) {
    QueueNode* newnode = (QueueNode*)malloc(sizeof(QueueNode));
    if (newnode == NULL)
        return 0;
    newnode->id = e;
    newnode->next = NULL;
    Qp->rear->next = newnode;
    Qp->rear = newnode;
    return 1;
}
int DeQueue(TaskQueue* Qp, int* ep, int threadID) {
    QueueNode* deletenode;
    if (Qp->rear == Qp->front)
        return 0;
    deletenode = Qp->front->next;
    if (deletenode == NULL) {
        return 0;
    }
    *ep = deletenode->id;
    Qp->front->next = deletenode->next;
    free(deletenode);
    return 1;
}
int GetNextTask();
int thread_count, finished = 0;
pthread_mutex_t mutex, mutex2;
pthread_cond_t cond;
void* task(void* rank);
TaskQueue Q;
int main()
{
    int n;
    InitQueue(&Q);
    pthread_t* thread_handles;
    thread_count = 8;
    thread_handles = malloc(thread_count * sizeof(pthread_t));
    pthread_mutex_init(&mutex, NULL);
    pthread_mutex_init(&mutex2, NULL);
    pthread_cond_init(&cond, NULL);
    printf("Task Number:");
    scanf_s("%d", &n);
    for (int i = 0; i < thread_count; i++)
        pthread_create(&thread_handles[i], NULL, task, (void*)i);
    for (int i = 0; i < n; i++) {
        pthread_mutex_lock(&mutex2);
        EnQueue(&Q, i);
        Sleep(1);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex2);
    }
    finished = 1;
    pthread_cond_broadcast(&cond);
    for (int i = 0; i < thread_count; i++)
        pthread_join(thread_handles[i], NULL);
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    free(thread_handles);
    return 0;
}
void* task(void* rank) {
    int my_rank = (long)rank;
    int my_task;
    QueueNode** p = &(Q.front->next);
    while (1) {
       pthread_mutex_lock(&mutex2);
        if (finished) {
            if (*p == NULL) {
                pthread_mutex_unlock(&mutex2);
                break;
            }
            DeQueue(&Q, &my_task, my_rank);
            pthread_mutex_unlock(&mutex2);
            printf("From thread %ld: Task no.%-3d result->%5d\n", my_rank, my_task, my_task * 10);

        }
        else {
            while(pthread_cond_wait(&cond, &mutex2)!=0);
            //pthread_mutex_lock(&mutex2);
            DeQueue(&Q, &my_task, my_rank);
            pthread_mutex_unlock(&mutex2);
            Sleep(2);
            printf("From thread %ld: Task no.%-3d result->%5d\n", my_rank, my_task, my_task * 10);
        }
    }
}


版权声明:本文为Lhw_666原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。