Threadpool based on C++11, simple and can take any number of parameters

Reprinted from https://www.cnblogs.com/lzpong/p/6397997.html

Ahem. C++ 11 added the thread library, bidding farewell to the history of the standard library not supporting concurrency. However, C++’s support for multi-threading is still relatively low-level, and slightly more advanced usage needs to be implemented by yourself, such as thread pools, semaphores, etc. Thread pool has been asked many times in interviews, and the general answer is: “Manage a task queue, a thread queue, and then assign one task to a thread at a time, and the cycle repeats. ” It seems there is no problem. But something went wrong when I started writing the program.

Not much nonsense, let’s implement it first, and then talk about it. (dont talk, show me ur code!)

Code implementation

There is a GitHub link at the end of the article, but no one looked at it. It is updated. It will be updated now — 2022/06/02

#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <atomic>
#include <future>
//#include <condition_variable>
//#include <thread>
//#include <functional>
#include <stdexcept>

namespace std
{
//The maximum capacity of the thread pool should be set as small as possible
#defineTHREADPOOL_MAX_NUM 16
//Whether the thread pool can grow automatically (if necessary, and not exceed THREADPOOL_MAX_NUM)
//#define THREADPOOL_AUTO_GROW

//Thread pool, you can submit variable parameter function or anonymous function execution of lambda expression, and you can get the execution return value
//Does not directly support class member functions, but supports class static member functions or global functions, Opteron() functions, etc.
class threadpool
{
    unsigned short _initSize; //Initialize the number of threads
    using Task = function<void()>; //Define type
    vector<thread> _pool; //Thread pool
    queue<Task> _tasks; //task queue
    mutex _lock; //task queue synchronization lock
#ifdef THREADPOOL_AUTO_GROW
    mutex _lockGrow; //Thread pool growth synchronization lock
#endif // !THREADPOOL_AUTO_GROW
    condition_variable _task_cv; //Conditional blocking
    atomic<bool> _run{ true }; //Whether the thread pool is executed
    atomic<int> _idlThrNum{ 0 }; //Number of idle threads

public:
    inline threadpool(unsigned short size = 4) { _initSize = size; addThread(size); }
    inline ~threadpool()
    {
        _run=false;
        _task_cv.notify_all(); // Wake up all threads for execution
        for (thread & amp; thread : _pool) {
            //thread.detach(); // Let the thread "fend for itself"
            if (thread.joinable())
                thread.join(); // Wait for the task to end, premise: the thread will definitely finish executing
        }
    }

public:
    //Submit a task
    // Calling .get() to obtain the return value will wait for the task to complete and obtain the return value.
    // There are two ways to call class members,
    // One is to use bind: .commit(std::bind( & amp;Dog::sayHello, & amp;dog));
    // One is to use mem_fn: .commit(std::mem_fn( & amp;Dog::sayHello), this)
    template<class F, class... Args>
    auto commit(F & amp; & amp; f, Args & amp; & amp;... args) -> future<decltype(f(args...))>
    {
        if (!_run) // stopped 
            throw runtime_error("commit on ThreadPool is stopped.");

        using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, the return value type of function f
        auto task = make_shared<packaged_task<RetType()>>(
            bind(forward<F>(f), forward<Args>(args)...)
        ); // Package (bind) the function entry and parameters
        future<RetType> future = task->get_future();
        { //Add task to queue
            lock_guard<mutex> lock{ _lock };//Lock the statement of the current block. lock_guard is the stack encapsulation class of mutex. It uses lock() during construction and unlock() during destruction.
            _tasks.emplace([task]() { // push(Task{...}) to the back of the queue
                (*task)();
            });
        }
#ifdef THREADPOOL_AUTO_GROW
        if (_idlThrNum < 1 & amp; & amp; _pool.size() <thREADPOOL_MAX_NUM)
            addThread(1);
#endif // !THREADPOOL_AUTO_GROW
        _task_cv.notify_one(); // Wake up a thread for execution

        return future;
    }

    //Number of idle threads
    int idlCount() { return _idlThrNum; }
    //Number of threads
    int thrCount() { return _pool.size(); }

#ifndef THREADPOOL_AUTO_GROW
private:
#endif // !THREADPOOL_AUTO_GROW
    //Add the specified number of threads
    void addThread(unsigned short size)
    {
#ifndef THREADPOOL_AUTO_GROW
        if (!_run) // stopped 
            throw runtime_error("Grow on ThreadPool is stopped.");
        unique_lock<mutex> lockGrow{ _lockGrow }; //Automatically grow lock
#endif // !THREADPOOL_AUTO_GROW
        for (; _pool.size() <thREADPOOL_MAX_NUM & amp; & amp; size > 0; --size)
        { //Increase the number of threads, but not exceeding the predefined number THREADPOOL_MAX_NUM
            _pool.emplace_back([this]{ //Worker thread function
                while (true) //Prevent _run==false from ending immediately, the task queue may not be empty at this time
                {
                    Task task; // Get a task to be executed
                    {
                        // The advantage of unique_lock compared to lock_guard is that it can unlock() and lock() at any time
                        unique_lock<mutex> lock{ _lock };
                        _task_cv.wait(lock, [this] { // wait until there is a task, or it needs to be stopped
                            return !_run || !_tasks.empty();
                        });
                        if (!_run & amp; & amp; _tasks.empty())
                            return;
                        _idlThrNum--;
                        task = move(_tasks.front()); // Take a task from the queue according to first in first out
                        _tasks.pop();
                    }
                    task();//execute task
#ifndef THREADPOOL_AUTO_GROW
                    if (_idlThrNum>0 & amp; & amp; _pool.size() > _initSize) //Support automatic release of idle threads to avoid a large number of idle threads after the peak
                        return;
#endif // !THREADPOOL_AUTO_GROW
                    {
                        unique_lock<mutex> lock{ _lock };
                        _idlThrNum + + ;
                    }
                }
            });
            {
                unique_lock<mutex> lock{ _lock };
                _idlThrNum + + ;
            }
        }
    }
};

}

#endif //https://github.com/lzpong/

It’s not much code, just hundreds of lines of code to complete Thread Pool, and, look at the commit, ha, it’s not a fixed parameter, there is no limit on the number of parameters! This is due to the variable parameter template.

How to use?

Look at the code below (expand to view)

#include "threadpool.h"
#include <iostream>

void fun1(int slp)
{
    printf(" hello, fun1 ! %d\
" ,std::this_thread::get_id());
    if (slp>0) {
        printf(" ======= fun1 sleep %d ========= %d\
",slp, std::this_thread::get_id());
        std::this_thread::sleep_for(std::chrono::milliseconds(slp));
    }
}

struct gfun {
    int operator()(int n) {
        printf("%d hello, gfun ! %d\
" ,n, std::this_thread::get_id() );
        return 42;
    }
};

class A {
public:
    static int Afun(int n = 0) { //The function must be static to use the thread pool directly
        std::cout << n << " hello, Afun ! " << std::this_thread::get_id() << std::endl;
        return n;
    }

    static std::string Bfun(int n, std::string str, char c) {
        std::cout << n << " hello, Bfun ! "<< str.c_str() <<" " << (int)c <<" " << std::this_thread::get_id() << std ::endl;
        return str;
    }
};

int main()
    try {
        std::threadpool executor{ 50 };
        A a;
        std::future<void> ff = executor.commit(fun1,0);
        std::future<int> fg = executor.commit(gfun{},0);
        std::future<int> gg = executor.commit(a.Afun, 9999); //IDE prompts an error, but it can be compiled and run
        std::future<std::string> gh = executor.commit(A::Bfun, 9998,"mult args", 123);
        std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id() << std::endl; return "hello,fh ret !"; });

        std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::microseconds(900));

        for (int i = 0; i < 50; i + + ) {
            executor.commit(fun1,i*100);
        }
        std::cout << " ======= commit all ========= " << std::this_thread::get_id()<< " idlsize="<<executor.idlCount() << std::endl;

        std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));

        ff.get(); //Calling .get() to obtain the return value will wait for the thread to finish executing and obtain the return value.
        std::cout << fg.get() << " " << fh.get().c_str()<< " " << std::this_thread::get_id() << std::endl;

        std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));

        std::cout << " ======= fun1,55 ========= " << std::this_thread::get_id() << std::endl;
        executor.commit(fun1,55).get(); //Calling .get() to obtain the return value will wait for the thread to finish executing

        std::cout << "end... " << std::this_thread::get_id() << std::endl;


        std::threadpool pool(4);
        std::vector< std::future<int> > results;

        for (int i = 0; i < 8; + + i) {
            results.emplace_back(
                pool.commit([i] {
                    std::cout << "hello " << i << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    std::cout << "world " << i << std::endl;
                    return i*i;
                })
            );
        }
        std::cout << " ======= commit all2 ========= " << std::this_thread::get_id() << std::endl;

        for (auto & amp; & amp; result : results)
            std::cout << result.get() << ' ';
        std::cout << std::endl;
        return 0;
    }
catch (std::exception & amp; e) {
    std::cout << "some unhappy happened... " << std::this_thread::get_id() << e.what() << std::endl;
}

In order to avoid suspicion, let me first explain the copyright: the code was “written” by me, but the ideas came from the Internet, especially this thread pool implementation (basically copied this implementation, plus this classmate’s implementation and explanation, good things are worth copying! Then it was comprehensively changed to make it more concise).

Implementation principle

Continuing with the previous nonsense. “Manage a task queue and a thread queue, and then assign one task to a thread at a time, and the cycle continues.” Is there any problem with this idea? Thread pools generally reuse threads, so if a task is assigned to a certain thread and then reallocated after execution, it is basically not supported at the language level: threads in general languages execute a fixed task function. The thread ends when the execution is completed (at least in C++). So how to realize the allocation of tasks and threads?

Let each thread execute the scheduling function: loop to obtain a task and then execute it.

Isn’t the idea great? The uniqueness of the thread function is guaranteed, and threads are reused to execute tasks.

Even if you understand the idea, the code still needs to be explained in detail.

  1. There should be no objection to a thread pool or a task queue queue;
  2. Task queue is a typical producer-consumer model. This model requires at least two tools: a mutex + a condition variable, or a mutex + a semaphore. A mutex is actually a lock, which ensures the mutual exclusivity of adding and removing tasks (acquisition). A condition variable ensures the synchronization of task acquisition: an empty queue, the thread should wait (block);
  3. atomic itself is an atomic type, which is obvious from the name: their operations load()/store() are atomic operations, so there is no need to add a mutex.

c++ language details

Even if you understand the principles, it does not mean that you can write a program. Many “weird tricks” of C++11 are used above, which are briefly described below.

  1. using Task = function is a type alias, which simplifies the use of typedef. function can be considered as a function type, accepting any function whose prototype is void(), or a function object, or an anonymous function. void() means no parameters and no return value.
  2. pool.emplace_back([this]{…}) has the same function as pool.push_back([this]{…}), but the former performs better;
  3. pool.emplace_back([this]{…}) constructs a thread object, and the execution function is a lambda anonymous function;
  4. All objects are initialized using {} instead of () because the style is not consistent enough and prone to errors;
  5. Anonymous function: [this]{…} Not much to say. [] is the catcher, this is the this pointer of the variable outside the reference domain, an infinite loop is used internally, and the thread is blocked by cv_task.wait(lock,[this]{…});
  6. delctype(expr) is used to infer the type of expr, which is similar to auto. It is equivalent to a type placeholder and occupies the position of a type; auto f(A a, B b) -> decltype(a + b) is a Usage cannot be written as decltype(a + b) f(A a, B b), why? ! This is how c++ is stipulated!
  7. Isn’t the commit method a little weird? You can take as many parameters as you want, the first parameter is f, followed by the parameters of the function f! (Note: If the parameters are to be passed to struct/class, it is recommended to use pointer, and be careful about the scope of the variable) Variable parameter templates are a highlight of C++11, bright enough! As for why it is Arg… and arg… , because that’s how the regulations are used!
  8. Commit can only call the stdcall function when used directly, but there are two ways to call class members. One is to use bind: .commit(std::bind( & amp;Dog::sayHello, & amp;dog)); 1 The first method is to use mem_fn: .commit(std::mem_fn( & amp;Dog::sayHello), & amp;dog);
  9. make_shared is used to construct shared_ptr smart pointers. The general usage is shared_ptr p = make_shared(4) then *p == 4. The advantage of smart pointers is automatic delete!
  10. The bind function accepts function f and some parameters, and returns the anonymous function after currying. For example, bind(add, 4) can implement a function similar to add4!
  11. The forward() function is similar to the move() function. The latter rvalues the parameters, and the former… How to say? The general meaning is: do not change the reference type of the originally passed in type (lvalue or lvalue, rvalue or rvalue);
  12. packaged_task is the encapsulation class of the task function. You can get the future through get_future, and then you can get the return value of the function (future.get()) through the future; packaged_task itself can be called like a function ();
  13. queue is a queue class, front() gets the head element, pop() removes the head element; back() gets the tail element, push() adds the element to the tail;
  14. lock_guard is a stack encapsulation class of mutex. It lock() during construction and unlock() during destruction. It is the idea of C++ RAII;
  15. condition_variable cv; condition variable needs to be used with unique_lock; the advantage of unique_lock compared to lock_guard is that it can unlock() and lock() at any time. cv.wait() needs to hold the mutex before, wait itself will unlock() the mutex, and if the conditions are met, the mutex will be held again.
  16. When the thread pool is finally destructed, join() can wait for all tasks to be executed before ending, which is very safe!

Git

The code is saved in git, you can get the latest code here: GitHub – lzpong/threadpool: based on C++ 11, a mini threadpool, accept variable number of parameters A thread pool based on C++ 11, simple and can take any number of parameters

[copy right from url: http://blog.csdn.net/zdarks/article/details/46994607, https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h]