cpp11 implements thread pool (seven) – design and implementation of thread pool cached mode

Using vector::size() to obtain the number of elements in the current container is not thread-safe, so using atomic_int to change the number of elements in the current container can ensure thread safety

Modification of thread pool member variables

Add variables to record the current number of threads, the number of idle threads, and the upper limit of the number of threads:

int threadSizeThreshHold_; // Maximum number of threads (used in cached mode)
std::atomic_int curThreadSize_; // Record the total number of threads in the current thread pool (used in cached mode)
std::atomic_int idleThreadSize_; // record the number of idle threads (used in cached mode)

The container storing the thread pool should also be changed to unordered_map, and ThreadId can be used to find the corresponding Thread object, so that we can pass the threadId to delete the corresponding Thread object

std::unordered_map<int, std::unique_ptr<Thread>> threads_;

Main thread submitTask function modification

Thread pool thread expansion is when submitting a tasksubmitTask detects that the number of idle threads is less than the number of tasks and the current total number of threads is less than the threshold of the number of threads, then a new thread can be created:

Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{<!-- -->
// acquire the lock
std::unique_lock<std::mutex> lck(taskQueMtx_);
if (!notFull_.wait_for(lck, std::chrono::seconds(1), [ & amp;]()->bool {<!-- --> return taskQue_.size() < (size_t)taskQueMaxThreshHold_ ; }))
{<!-- -->
std::cerr << "task queue is full, submit task fail" << std::endl;
return Result(sp, false);//
}

// If there is space, put the task into the task queue
taskQue_.emplace(sp);
taskSize_++; // ?

// The task queue is not empty, wake up the wait of notEmpty_, that is, notify the consumer to consume
notEmpty_.notify_all();

// cached task processing is more urgent, scenario: small and fast tasks
// At the same time, it is necessary to judge whether to create a new thread according to the number of tasks and the number of idle threads
if (poolMode_ == PoolMode::MODE_CACHED
& amp; & amp; taskSize_ > idleThreadSize_
& amp; & amp; curThreadSize_ <threadSizeThreshHold_)
{<!-- -->
std::cout << ">>> create new thread..." << std::endl;

// create new thread object
auto ptr = std::make_unique<Thread>(std::bind( &ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
// The copy construction and assignment of unique_ptr are deleted, so ptr should be converted to an rvalue
threads_.emplace(threadId, std::move(ptr));

// start the thread
threads_[threadId]->start();

curThreadSize_++;
idleThreadSize_++;
}

// Return the Result object of the task
return Result(sp);
}

Worker thread running function modification

In cached mode, many threads may be created, but if the idle time exceeds 60s, the redundant threads should be recycled (but the number of threads must be >= initThreadSize_), that is:

if current time – last thread execution time > 60s then recycle thread

Judgment once per second, need to distinguish between timeout return and task to return, use lock + double judgment

And when the thread pool is closed, all threads must finish the task at hand before ending the thread, that is, while (taskQue_.size() == 0) to judge Whether the thread pool is terminated

// The threadid parameter is Thread::threadId_, which is a custom object number
// All task consumption tasks in the thread pool
void ThreadPool::threadFunc(int threadid)
{<!-- -->
auto lastTime = std::chrono::high_resolution_clock().now();
\t

// All tasks must be executed before the thread pool can reclaim all thread resources
for (;;)
{<!-- -->
std::shared_ptr<Task> task;
{<!-- -->
// acquire the lock
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid:" << std::this_thread::get_id() << "trying to get task..." << std::endl;
\t\t\t
// no task processing
while (taskQue_. size() == 0)
{<!-- -->
// The thread pool ends, and thread resources are recycled
if (!isPoolRunning_)
{<!-- -->
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
exitCond_.notify_all();
return;
}

if (poolMode_ == PoolMode::MODE_CACHED)
{<!-- -->
// Timeout, here is used to detect whether the idle time of the current thread meets the deletion condition every 1s
if(std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{<!-- -->
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime);

if (dur.count() >= THREAD_MAX_IDLE_TIME
& amp; & amp; curThreadSize_ > initThreadSize_)
{<!-- -->
// Recycle the current thread
// Record the modification of variables related to the number of threads
// Delete the thread object from the thread list container
// threadid => Thread object => erase
threads_.erase(threadid); // Note that it is not to delete std::this_thread::get_id()
curThreadSize_--;
idleThreadSize_--; // The thread here must be idle, because there is no task at all

std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
return;
}
}
}
else // fixed mode
{<!-- -->
notEmpty_.wait(lock);
}
}


idleThreadSize_--; // When executing tasks, idle thread -1

std::cout << "tid:" << std::this_thread::get_id() << "get task successfully..." << std::endl;
// Take a task from the task queue
task = taskQue_. front();
taskQue_.pop();
taskSize_--;

// If there are still remaining tasks, continue to notify other threads to perform tasks
if (taskQue_. size() > 0)
{<!-- -->
notEmpty_.notify_all();
}

// Take out a task to notify the producer that the production task can be performed
notFull_.notify_all();
} // Only need to lock taskQue_ and related critical resource operations, no need to lock for execution

// The current thread is responsible for executing this task
if (task != nullptr)
{<!-- -->
//task->run();
// Execute the task, and give the task's return value setVal method to Result
task->exec();
}
// idle thread + 1 after task execution
idleThreadSize_++;
auto lastTime = std::chrono::high_resolution_clock().now();
}
}

Thread pool resource recycling

Thread pool destruction sets the running state to false

Notify all consumers (each worker thread Slave) that they can exit if they detect that the running status is false

Note: submitTask is called by the main thread, the thread used to assign tasks (Master)

ThreadPool::~ThreadPool()
{<!-- -->
    isPoolRunning_ = false;
    // Wait for all threads in the thread pool to return, two states: blocked or executing tasks
    std::unique_lock<std::mutex> lock(taskQueMtx_);
    notEmpty_.notify_all(); //
    // wait for the thread to be destroyed
    exitCond_.wait(lock, [ & amp;]()->bool {<!-- --> return threads_.size() == 0; });
}

The initial number of threads can be given by the number of hardware concurrency (number of CPU cores) provided by the library function, that is,

void start(int initThreadSize = std::thread::hardware_concurrency());

Test

Set the initial number of threads to 4, submit 6 tasks, the maximum idle time is 6s (recycle if it exceeds), the mode is MODE_CACHED, the test code is as follows:

#include <iostream>
#include <chrono>
#include "threadpool.h"

using uLong = unsigned long long;

class MyTask : public Task
{<!-- -->
public:
MyTask(uLong /*int*/ begin, uLong /*int*/ end)
: begin_(begin)
, end_(end)
{<!-- -->}

Any run()
{<!-- -->
std::cout << "tid:" << std::this_thread::get_id() << "begin!" << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(5));
uLong sum = 0;
for (uLong i = begin_; i <= end_; i ++ )
{<!-- -->
sum + = i;
}
std::cout << "tid:" << std::this_thread::get_id() << " end!" << std::endl;
\t\t
return sum;
}
private:
uLong /*int*/ begin_;
uLong /*int*/ end_;
};
int main()
{<!-- -->
\t
#if 1
ThreadPool pool;
// Set to scalable mode
pool.setMode(PoolMode::MODE_CACHED);
pool. start(4);
\t
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(1000000001, 2000000000));
Result res3 = pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
\t
uLong sum1 = res1.get().cast_<uLong>();
uLong sum2 = res2.get().cast_<uLong>();
uLong sum3 = res3.get().cast_<uLong>();

// Master - Slave threading model
std::cout << (sum1 + sum2 + sum3) << std::endl;
\t
getchar();
#endif
return 0;
}

The experimental results are as follows:

It can be seen that 2 additional threads can be dynamically created, and the threads can be recycled when the timeout expires, and all threads can be recycled normally when exiting.