cpp11 implements thread pool (nine) – using packaged_task and future to rewrite thread pool

packaged_task and future mechanism

packaged_task is similar to function object function

Background: To make submitting tasks more convenient, we hope to achieve the following forms of submitting tasks

int sum1(int a, int b)
{<!-- -->
return a + b;
}

int sum2(int a, int b, int c)
{<!-- -->
return a + b + c;
}

int main()
{<!-- -->
ThreadPool pool;
pool. submitTask(sum1, 10, 20);
pool. submitTask(sum2, 10, 20, 30);

return 0;
}

Use case

#include <iostream>
#include <functional>
#include <thread>
#include <future> //
#include <chrono>

using namespace std;

int sum1(int a, int b)
{<!-- -->
return a + b;
}

int sum2(int a, int b, int c)
{<!-- -->
return a + b + c;
}

int main()
{<!-- -->
// Wrap sum1 into a task object
packaged_task<int(int, int)> task(sum1);
future<int> res = task.get_future(); // future is equivalent to the Result class of the previous version
//task(10, 20);
// packaged_task has disabled copy construction and copy assignment
thread(std::move(task), 10, 20); // execute the task in the thread
cout << res.get() << endl; // block if the task is not finished
\t
return 0;
}

Rewrite thread pool

Changes to submitTask

Using variable parameter template, and return type at the end, the focus is on the encapsulation of tasks

// The return value type of the function infers a future type through the std::future<decltype(func(args...)> at the end
template<typename Func, typename... Args>
auto submitTask(Func & amp; & amp; func, Args & amp; & amp;... args) -> std::future<decltype(func(args...))>
{<!-- -->
    // Pack the task and put it in the task queue
    using RType = decltype(func(args...));
    auto task = std::make_shared<std::packaged_task<RType()>>(
        std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    std::future<RType> result = task->get_future();

    // acquire the lock
    std::unique_lock<std::mutex> lock(taskQueMtx_);
    // The user submits the task, and the longest blockage cannot exceed 1s, otherwise it is judged that the task submission failed and returns
    if (!notFull_.wait_for(lock, std::chrono::seconds(1),
        [ & amp;]()->bool {<!-- --> return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))
    {<!-- -->
        // Indicates that notFull_ waits for 1s, and the condition is still not met
        std::cerr << "task queue is full, submit task fail." << std::endl;
        auto task = std::make_shared<std::packaged_task<RType()>>(
            []()->RType {<!-- --> return RType(); });
        // The default task object that failed to add needs to be executed, otherwise calling future::get() outside will never get the result
        // vs can detect whether the task is executed, and an exception will occur if it is not executed
        (*task)();
        return task->get_future();
    }

    // If there is space, put the task into the task queue
    // taskQue_.emplace(sp);
    // using Task = std::function<void()>;
    taskQue_.emplace([task]() {<!-- -->(*task)(); });
    taskSize_++;

    //Because a new task has been placed, the task queue must not be empty, so notify on notEmpty_, and quickly assign a thread to execute the task
    notEmpty_.notify_all();

    // The task processing in cached mode is more urgent Scenario: small and fast tasks need to judge whether to create new threads according to the number of tasks and the number of idle threads
    if (poolMode_ == PoolMode::MODE_CACHED
         & amp; & amp; taskSize_ > idleThreadSize_
         & amp; & amp; curThreadSize_ <threadSizeThreshHold_)
    {<!-- -->
        std::cout << ">>> create new thread..." << std::endl;

        // create new thread object
        auto ptr = std::make_unique<Thread>(std::bind( &ThreadPool::threadFunc, this, std::placeholders::_1));
        int threadId = ptr->getId();
        threads_.emplace(threadId, std::move(ptr));
        // start the thread
        threads_[threadId]->start();
        // Modify the variables related to the number of threads
        curThreadSize_++;
        idleThreadSize_++;
    }

    // Return the Result object of the task
    return result;
}

Complete code

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include <atomic>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <unordered_map>
#include <functional>
#include <future>

const int TASK_MAX_THRESHHOLD = INT32_MAX;
const int THREAD_MAX_THRESHHOLD = 1024;
const int THREAD_MAX_IDLE_TIME = 60; // The maximum idle time of the thread, if it exceeds, it will be recycled

enum class PoolMode // C++11
{<!-- -->
MODE_FIXED, // fixed number of thread pools
MODE_CACHED, // thread pool with variable number of thread pools
};

class Thread
{<!-- -->
public:
using ThreadFunc = std::function<void(int)>;

Thread(ThreadFunc func)
: func_(func)
, threadId_(generateId_ + + )
{<!-- -->

}
~Thread() = default;

// start the thread
void start()
{<!-- -->
// create a thread to execute a thread function
std::thread t(func_, threadId_);
t.detach(); // Set the detachment thread, the t thread controls its own life cycle
}

int getId() const
{<!-- -->
return threadId_;
}
private:
ThreadFunc func_;
static int generateId_;
int threadId_; // Save the id of the thread object, not the thread number given by the system
};
int Thread::generateId_ = 0;

class ThreadPool
{<!-- -->
public:
ThreadPool()
: initThreadSize_(4)
, taskSize_(0)
, idleThreadSize_(0)
, curThreadSize_(0)
, taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD)
, threadSizeThreshHold_(THREAD_MAX_THRESHHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRunning_(false)
{<!-- -->

}
~ThreadPool()
{<!-- -->
isPoolRunning_ = false;
// Wait for all threads in the thread pool to return, two states: blocked or executing tasks
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all(); // Notify all consumers, let them detect that the running status is false and exit
// wait for the thread to be destroyed
exitCond_.wait(lock, [ & amp;]()->bool {<!-- --> return threads_.size() == 0; });
}

// Set the thread pool working mode
void setMode(PoolMode mode)
{<!-- -->
if (checkRunningState())
return;
poolMode_ = mode;
}

// Set the task task queue upper limit threshold
void setTaskQueMaxThreadhold(int threshhold)
{<!-- -->
if (checkRunningState())
return;
taskQueMaxThreshHold_ = threshhold;
}

// Set the upper limit threshold of threads in thread pool cached mode
void setThreadSizeThreadhold(int threshhold)
{<!-- -->
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{<!-- -->
threadSizeThreshHold_ = threshhold;
}
}

// Submit tasks to the thread pool
template<typename Func, typename... Args>
auto submitTask(Func & amp; & amp; func, Args & amp; & amp;... args) -> std::future<decltype(func(args...))>
{<!-- -->
// Pack the task and put it in the task queue
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RType> result = task->get_future();

// acquire the lock
std::unique_lock<std::mutex> lock(taskQueMtx_);
// The user submits the task, and the longest blockage cannot exceed 1s, otherwise it is judged that the task submission failed and returns
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[ & amp;]()->bool {<!-- --> return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))
{<!-- -->
// Indicates that notFull_ waits for 1s, and the condition is still not met
std::cerr << "task queue is full, submit task fail." << std::endl;
auto task = std::make_shared<std::packaged_task<RType()>>(
[]()->RType {<!-- --> return RType(); });
(*task)();
return task->get_future();
}

// If there is space, put the task into the task queue
// taskQue_.emplace(sp);
// using Task = std::function<void()>;
taskQue_.emplace([task]() {<!-- -->(*task)(); });
taskSize_++;

//Because a new task has been placed, the task queue must not be empty, so notify on notEmpty_, and quickly assign a thread to execute the task
notEmpty_.notify_all();

// The task processing in cached mode is more urgent Scenario: small and fast tasks need to judge whether to create new threads according to the number of tasks and the number of idle threads
if (poolMode_ == PoolMode::MODE_CACHED
& amp; & amp; taskSize_ > idleThreadSize_
& amp; & amp; curThreadSize_ <threadSizeThreshHold_)
{<!-- -->
std::cout << ">>> create new thread..." << std::endl;

// create new thread object
auto ptr = std::make_unique<Thread>(std::bind( &ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// start the thread
threads_[threadId]->start();
// Modify the variables related to the number of threads
curThreadSize_++;
idleThreadSize_++;
}

// Return the Result object of the task
return result;
}

// Turn on the thread pool
void start(int initThreadSize = std::thread::hardware_concurrency())
{<!-- -->
// Set the thread pool running status
isPoolRunning_ = true;

// record the number of initial threads
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;

// create thread object
// Give the thread function to the thread thread object
for (size_t i = 0; i < initThreadSize_; i ++ )
{<!-- -->
auto ptr = std::make_unique<Thread>(std::bind( &ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
// The copy construction and assignment of unique_ptr are deleted, so ptr should be converted to an rvalue
threads_.emplace(threadId, std::move(ptr));
}

// start all threads
for (size_t i = 0; i < initThreadSize_; i ++ )
{<!-- -->
threads_[i]->start(); // need to execute a thread function
idleThreadSize_ + + ; // number of idle threads + 1 (cached mode)
}
}

// Prohibit copy construction and assignment
ThreadPool(const ThreadPool & amp;) = delete;
ThreadPool & amp; operator=(const ThreadPool & amp;) = delete;

private:
// define thread function
void threadFunc(int threadid)
{<!-- -->
auto lastTime = std::chrono::high_resolution_clock().now();

// All tasks must be executed before the thread pool can reclaim all thread resources
for (;;)
{<!-- -->
Task task;
{<!-- -->
// acquire the lock first
std::unique_lock<std::mutex> lock(taskQueMtx_);

std::cout << "tid:" << std::this_thread::get_id()
<< "Trying to get tasks..." << std::endl;

// In cached mode, many threads may have been created, but the idle time exceeds 60s, and the redundant threads should be
// End recycling (threads exceeding the number of initThreadSize_ will be recycled)
// current time - last thread execution time > 60s

// Return once every second How to distinguish: return overtime? There are still tasks to execute and return
// lock + double judgment
while (taskQue_. size() == 0)
{<!-- -->
// The thread pool is about to end, reclaim thread resources
if (!isPoolRunning_)
{<!-- -->
threads_.erase(threadid); // std::this_thread::getid()
std::cout << "threadid:" << std::this_thread::get_id() << "exit!"
<< std::endl;
exitCond_.notify_all();
return; // thread function ends, thread ends
}

if (poolMode_ == PoolMode::MODE_CACHED)
{<!-- -->
// condition variable, timeout returned
if (std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{<!-- -->
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME
& amp; & amp; curThreadSize_ > initThreadSize_)
{<!-- -->
// Start recycling the current thread
// Record the value modification of the related variables of the number of threads
// Delete the thread object from the thread list container, there is no way threadFunc《=》thread object
// threadid => thread object => delete
threads_.erase(threadid); // std::this_thread::getid()
curThreadSize_--;
idleThreadSize_--;

std::cout << "threadid:" << std::this_thread::get_id() << "exit!"
<< std::endl;
return;
}
}
}
else
{<!-- -->
// wait for notEmpty condition
notEmpty_.wait(lock);
}
}

idleThreadSize_--;

std::cout << "tid:" << std::this_thread::get_id()
<< "Getting the task successfully..." << std::endl;

// Fetch a task from the task queue
task = taskQue_. front();
taskQue_.pop();
taskSize_--;

// If there are still remaining tasks, continue to notify other threads to perform tasks
if (taskQue_. size() > 0)
{<!-- -->
notEmpty_.notify_all();
}

// Take out a task and make a notification, and the notification can continue to submit the production task
notFull_.notify_all();
} // The lock should be released

// The current thread is responsible for executing this task
if (task != nullptr)
{<!-- -->
task(); // execute function<void()>
}

idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now(); // Update the time when the thread finishes executing the task
}
}

// Check the running status of the pool
bool checkRunningState() const
{<!-- -->
return isPoolRunning_;
}

private:
//std::vector<std::unique_ptr<Thread>> threads_; // thread list
// Create a mapping between thread id and thread object pointer for easy query
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // thread list
size_t initThreadSize_; //Number of initial threads
int threadSizeThreshHold_; // Maximum number of threads (used in cached mode)
std::atomic_int curThreadSize_; // Record the total number of threads in the current thread pool (used in cached mode)
std::atomic_int idleThreadSize_; // record the number of idle threads (used in cached mode)

// task queue
using Task = std::function<void()>;
std::queue<Task> taskQue_; // task queue
std::atomic_int taskSize_; // Number of tasks?
int taskQueMaxThreshHold_; // The upper threshold of the number of task queues

std::mutex taskQueMtx_; // Ensure the thread safety of the task queue
std::condition_variable notFull_; // Indicates that the task queue is not full
std::condition_variable notEmpty_; // Indicates that the task queue is not empty
std::condition_variable exitCond_; // wait until all thread resources are recovered

PoolMode poolMode_; // thread pool mode
std::atomic_bool isPoolRunning_; // thread pool running status
};
#endif

Test 1

int sum1(int a, int b)
{<!-- -->
return a + b;
}

int sum2(int a, int b, int c)
{<!-- -->
return a + b + c;
}

int main()
{<!-- -->

ThreadPool pool;
pool. start(4);
future<int> r1 = pool. submitTask(sum1, 10, 20);
future<int> r2 = pool. submitTask(sum2, 10, 20, 30);
future<int> r3 = pool.submitTask([](int b, int e)->int {<!-- -->
int sum = 0;
for (int i = b; i <= e; i + + ) sum + = i;
return sum;
}, 1, 100);
//future<int> r1 = pool. submitTask(sum1, 10, 20);
cout << r1.get() << endl;
cout << r2.get() << endl;
cout << r3.get() << endl;
\t
return 0;
}

Experimental results

Test 2

int main()
{
    ThreadPool pool;
    pool.setMode(PoolMode::MODE_CACHED);
    pool. start(2);
    future<int> r1 = pool. submitTask(sum1, 10, 20);
    future<int> r2 = pool. submitTask(sum2, 10, 20, 30);
    future<int> r3 = pool.submitTask([](int b, int e)->int {
        int sum = 0;
        for (int i = b; i <= e; i + + ) sum + = i;
        return sum;
        }, 1, 100);
    //future<int> r1 = pool. submitTask(sum1, 10, 20);
    cout << r1.get() << endl;
    cout << r2.get() << endl;
    cout << r3.get() << endl;

return 0;
}

Test 3

Set TASK_MAX_THRESHHOLD to 2, submit 6 tasks (3 of which sleep for 2s), and the thread pool has only two threads.

int sum1(int a, int b)
{<!-- -->
this_thread::sleep_for(chrono::seconds(2));
return a + b;
}

int sum2(int a, int b, int c)
{<!-- -->
this_thread::sleep_for(chrono::seconds(2));
return a + b + c;
}

int main()
{<!-- -->

ThreadPool pool;
//pool.setMode(PoolMode::MODE_CACHED);
pool. start(2);
future<int> r1 = pool. submitTask(sum1, 10, 20);
future<int> r2 = pool. submitTask(sum2, 10, 20, 30);
future<int> r3 = pool.submitTask([](int b, int e)->int {<!-- -->
int sum = 0;
for (int i = b; i <= e; i + + ) sum + = i;
return sum;
}, 1, 100);

future<int> r4 = pool.submitTask([](int b, int e)->int {<!-- -->
int sum = 0;
for (int i = b; i <= e; i + + ) sum + = i;
return sum;
}, 1, 100);

future<int> r5 = pool.submitTask([](int b, int e)->int {<!-- -->
int sum = 0;
for (int i = b; i <= e; i + + ) sum + = i;
return sum;
}, 1, 100);
//future<int> r1 = pool. submitTask(sum1, 10, 20);
cout << r1.get() << endl;
cout << r2.get() << endl;
cout << r3.get() << endl;
cout << r4.get() << endl;
cout << r5.get() << endl;
return 0;
}

r1 and r2 are put into the queue and taken away by two threads, and then r3 and r4 put the task into the queue (the task queue is full). At this time, because there are sleeps in the r1 and r2 tasks, they are still being executed by the two threads. At this time The task queue is full and the main thread cannot put r5 into the task queue.