C++11:借助C++11特性简单高效实现线程池

难得有个半天的闲功法,在C++11下实现个简单的线程池模板。以前线程很麻烦,Windows/Linux代码不通用,需要借助boost库。但boost线程库做不到真正的header-only。好在C++11有了thread,mutex,条件变量,实现这样一个简单线程池可以做到简洁高效,客户端借助lamda用起来也很灵活。

客户端例子有空再给。


#ifndef _SIMPLE_THREAD_POOL_H
#define _SIMPLE_THREAD_POOL_H

#include <memory>
#include <deque>
#include <thread>
#include <mutex>
#include <sstream>
#include <condition_variable>

namespace util {

// SimpleDataQueue's get method returns immediately when queue is empty
template<class T>
struct SimpleDataQueue
{
    void Add(const T& data)
    {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        queue_.push_back(data);
    }

    bool Get(T& data)
    {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        if (queue_.empty()) {
            return false;
        }
        data = std::move(queue_.front());
        queue_.pop_front();
        return true;
    }

    size_t Size() const
    {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        return queue_.size();
    }

    bool Empty() const
    {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        return queue_.empty();
    }

private:
    std::deque<T> queue_;
    mutable std::mutex queue_mutex_;
};


// BlockingDataQueue's get method is blocked when queue is empty
template<class T>
struct BlockingDataQueue
{
    BlockingDataQueue()
    {}

    void Add(const T& data)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        queue_.push_back(data);
        lock.unlock();
        condition_var_.notify_one();
    }

    void Add(T&& data)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        queue_.push_back(data);
        lock.unlock();
        condition_var_.notify_one();
    }

    bool Get(T& data)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        if (!active_) {
            return false;
        }
        if (queue_.empty()) {
            condition_var_.wait(lock);
            if (!active_) {
                return false;
            }
        }
        data = std::move(queue_.front());
        queue_.pop_front();
        return true;
    }

    size_t Size() const
    {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        return queue_.size();
    }

    bool Empty() const
    {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        return queue_.empty();
    }

    // Destory the queue and notify all the wating threads to wake up
    void Destory()
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        queue_.clear();
        active_ = false;
        lock.unlock();
        condition_var_.notify_all();
    }

private:
    std::deque<T> queue_;
    mutable std::mutex queue_mutex_;
    std::condition_variable condition_var_;
    bool active_ = {true};
};


class SimpleThreadPool
{
public:
    struct ThreadInfo
    {
        explicit ThreadInfo()
        {}

        explicit ThreadInfo(SimpleThreadPool* p_pool)
            : p_pool_(p_pool)
        {}

        std::string IdToStr() const
        {
            std::stringstream ss;
            ss << p_thread_->get_id();
            std::string str;
            ss >> str;
            return str;
        }

        const std::thread& GetThread() const
        {
            return *p_thread_;
        }

    private:
        std::shared_ptr<std::thread> p_thread_;
        SimpleThreadPool* p_pool_ = {};
        friend class SimpleThreadPool;
    };

public:
    explicit SimpleThreadPool(size_t thread_num)
    {
        for (size_t i = 0; i < thread_num; ++i) {
            threads_.push_back(ThreadInfo(this));
        }
    }

    template<class Function, class... Args>
    explicit SimpleThreadPool(size_t thread_num, Function&& f, Args&&... args)
    {
        for (size_t i = 0; i < thread_num; ++i) {
            threads_.push_back(ThreadInfo(this));
        }
        SetThreadFunction(std::forward<Function>(f), std::forward<Args>(args)...);
    }

    template<class Function, class... Args>
    void SetThreadFunction(Function&& f, Args&&... args)
    {
        for (auto& t : threads_) {
            t.p_thread_ = std::make_shared<std::thread>(
                std::forward<Function>(f), std::forward<Args>(args)...);
        }
    }

    void JoinAll()
    {
        for (auto& t : threads_) {
            t.p_thread_->join();
        }
    }

    size_t ThreadNum() const
    {
        return threads_.size();
    }

    // Get the pointer to ThreadInfo structure whose thread ID is the current thread ID.
    // Return nullptr if the current thread is not in the pool.
    const ThreadInfo* GetCurrentThreadInfo() const
    {
        for (auto& t : threads_) {
            if (t.p_thread_->get_id() == std::this_thread::get_id()) {
                return &t;
            }
        }
        return nullptr;
    }

private:
    std::vector<ThreadInfo> threads_;
};

}
#endif //_SIMPLE_THREAD_POOL_H




版权声明:本文为dynuaa原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。