C++ uses boost to implement an asynchronous thread pool with return values and input parameters

Preface

Thread pools have some advantages over simple multi-threading:

1. The thread pool will automatically allocate work to an idle worker thread for execution, which is better than multi-thread execution in batches;

2. The thread pool only needs to be created once. The creation and destruction of multiple threads when used multiple times will cause a waste of resources;

There are also some disadvantages:

1. When a thread obtains a task in the task queue and submits a task to the task queue, it needs to seize the queue’s mutex lock, which will cause time loss, especially when there are many tasks and each task does not take a long time, preemption The time loss of the task queue mutex lock becomes even more obvious. For example, on a 16-core machine, when the thread pool opens 14 threads and submits 2,000 tasks to the thread pool (each task takes about 1ms), it takes about 20ms to submit tasks to the thread pool. Therefore, the thread pool method is more suitable for scenarios where each task takes a long time and the number of tasks is not particularly large.

Asynchronous multi-thread implementation with input and return values

Asynchronous multi-threading with input and return values can be implemented directly using boost::async, which is obtained through future.get(). However, it seems that building a thread pool cannot be implemented directly using boost::async.

First, let’s record the asynchronous multi-thread implementation with input and return values.

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>
using namespace std;

std::pair<int, double> sleep_print(std::vector<int> seconds) {
    double sum = 1;
    for (auto sec : seconds) {
        std::cout << std::this_thread::get_id() << " :start sec :" << sec << endl;

        // for (int i = 1; i < 100; i + + ) {
        // sum *= i;
        // }
        sleep(sec);
    }
    return std::pair<int, double>(seconds[0], sum);
}
int main() {
    auto t0 = chrono::system_clock::now();
    int thread_num = 4;
    std::vector<std::vector<int>> thread_init_poses;
    thread_init_poses.resize(thread_num);
    //Assign task group
    for (int i = 0; i < 9; i + + ) {
        int i_num = i % thread_num;
        thread_init_poses[i_num].push_back(i);
    }
    //Multi-thread execution task group
    std::vector<boost::unique_future<std::pair<int, double>>> futures;
    for (int i = 0; i <thread_num; i + + ) {
        futures.emplace_back(boost::async(boost::bind( & amp;sleep_print, thread_init_poses[i])));
    }

    boost::wait_for_all(futures.begin(), futures.end());
    // Get the result through future
    for (auto & amp; future : futures) {
        auto res = future.get();
        cout << res.first << ", " << res.second << endl;
    }

    auto t2 = chrono::system_clock::now();
    cout << "main spend time = " << double((t2 - t0).count()) / 1000 / CLOCKS_PER_SEC << endl;
    return 0;
}

output

main spend time = 12.0011

Asynchronous thread pool implementation with input and return values

I tried the threadpool library, but it seems that the return value cannot be obtained directly. It needs to be obtained through the callback function. It seems that it cannot be passed to the main function, which is quite troublesome. See Reference 1.

Finally, I finally succeeded in using boost::asio::io_service and boost::thread_group, and encapsulated it into a class, but did not make it into a template (both useless and lazy…)

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>
using namespace std;
// test class
typedef boost::packaged_task<std::pair<int, double>> task_t;
typedef boost::shared_ptr<task_t> ptask_t;

std::pair<int, double> sleep_print(int seconds) {
    std::cout << std::this_thread::get_id() << " :start seconds :" << seconds << endl;
    double sum = 1;
    // for (int i = 1; i < 100; i + + ) {
    // sum *= i;
    // }
    sleep(seconds);
    return std::pair<int, double>(seconds, sum);
}

class ThreadPool {
private:
    boost::thread_group threads;
    boost::asio::io_service io_service;

public:
    boost::shared_ptr<boost::asio::io_service::work> work;
    ThreadPool(int num);
    ~ThreadPool();

    void push_job(int seconds, std::vector<boost::shared_future<std::pair<int, double>>> & amp; futures);
};

ThreadPool ::ThreadPool(int num) {
    cout << "ThreadPool ::ThreadPool" << endl;
    work.reset(new boost::asio::io_service::work(io_service));
    for (int i = 0; i < num; + + i) {
        threads.create_thread(boost::bind( & amp;boost::asio::io_service::run, & amp;io_service));
    }
}

ThreadPool ::~ThreadPool() {
    threads.join_all();
    cout << "ThreadPool ::~ThreadPool" << endl;
}

void ThreadPool::push_job(int seconds, std::vector<boost::shared_future<std::pair<int, double>>> & amp; futures) {
    cout << "push_job :" << seconds << endl;
    ptask_t task = boost::make_shared<task_t>(boost::bind( & amp;sleep_print, seconds));
    boost::shared_future<std::pair<int, double>> future(task->get_future());
    futures.push_back(future);
    // io_service.reset();
    io_service.post(boost::bind( & amp;task_t::operator(), task));
}

int main() {
    auto t0 = chrono::system_clock::now();
    ThreadPool thread_pool(4);
    std::vector<boost::shared_future<std::pair<int, double>>> futures;

    for (int i = 0; i < 9; i + + ) {
        thread_pool.push_job(i, futures);
    }

    for (boost::shared_future<std::pair<int, double>> future : futures) {
        auto data = future.get();
        cout << data.first << ", " << data.second << endl;
    }

    for (int i = 0; i < 9; i + + ) {
        thread_pool.push_job(i, futures);
    }

    for (boost::shared_future<std::pair<int, double>> future : futures) {
        auto data = future.get();
        cout << data.first << ", " << data.second << endl;
    }

    thread_pool.work.reset();
    auto t2 = chrono::system_clock::now();
    cout << "main spend time = " << double((t2 - t0).count()) / 1000 / CLOCKS_PER_SEC << endl;
    return 0;
}

Wrote a simple test case, the final output is

main spend time = 24.0013
ThreadPool ::~ThreadPool

A running time of 24s means that the two sets of 9 tasks we set will be sequentially fed into 4 threads for execution (the longest time is (0 + 4 + 8) * 2 = 24s)

The reason for declaring an ioService work is to ensure that the run method of the io service will not exit before the work is destroyed.

Knot

Looking at the two sets of programs running the same data processing, the running time is 12s (the thread pool example was run twice, so it is 24s). At first glance, there is no difference. However, this is caused by the fact that the task allocation mechanism is exactly the same, which will inevitably lead to each Threads run the same sleep(second). In real situations, the time consumption of each task is unpredictable in advance. If you use multi-threading, it may happen that other threads have finished running the corresponding batch of tasks, and they are all waiting for the slowest batch of tasks. Thread pool Then you can let those who finish first participate in the remaining tasks.

In addition, if multiple threads have to be created and deleted every time a function is run, resources will be wasted. The thread pool only needs to be maintained at the beginning.

Reference article

c++11: thread pool, boost threadpool, thread_group example_c++ thread pool based on boost-CSDN blog

C++ Thread Pool usage analysis | Ce39906’s Blog

boost::asio::io_service creates a simple instance of a thread pool_asio thread pool-CSDN blog