Using vector::size() to obtain the number of elements in the current container is not thread-safe, so using atomic_int to change the number of elements in the current container can ensure thread safety
Modification of thread pool member variables
Add variables to record the current number of threads, the number of idle threads, and the upper limit of the number of threads:
int threadSizeThreshHold_; // Maximum number of threads (used in cached mode) std::atomic_int curThreadSize_; // Record the total number of threads in the current thread pool (used in cached mode) std::atomic_int idleThreadSize_; // record the number of idle threads (used in cached mode)
The container storing the thread pool should also be changed to unordered_map
, and ThreadId
can be used to find the corresponding Thread
object, so that we can pass the threadId
to delete the corresponding Thread
object
std::unordered_map<int, std::unique_ptr<Thread>> threads_;
Main thread submitTask function modification
Thread pool thread expansion is when submitting a tasksubmitTask
detects that the number of idle threads is less than the number of tasks and the current total number of threads is less than the threshold of the number of threads, then a new thread can be created:
Result ThreadPool::submitTask(std::shared_ptr<Task> sp) {<!-- --> // acquire the lock std::unique_lock<std::mutex> lck(taskQueMtx_); if (!notFull_.wait_for(lck, std::chrono::seconds(1), [ & amp;]()->bool {<!-- --> return taskQue_.size() < (size_t)taskQueMaxThreshHold_ ; })) {<!-- --> std::cerr << "task queue is full, submit task fail" << std::endl; return Result(sp, false);// } // If there is space, put the task into the task queue taskQue_.emplace(sp); taskSize_++; // ? // The task queue is not empty, wake up the wait of notEmpty_, that is, notify the consumer to consume notEmpty_.notify_all(); // cached task processing is more urgent, scenario: small and fast tasks // At the same time, it is necessary to judge whether to create a new thread according to the number of tasks and the number of idle threads if (poolMode_ == PoolMode::MODE_CACHED & amp; & amp; taskSize_ > idleThreadSize_ & amp; & amp; curThreadSize_ <threadSizeThreshHold_) {<!-- --> std::cout << ">>> create new thread..." << std::endl; // create new thread object auto ptr = std::make_unique<Thread>(std::bind( &ThreadPool::threadFunc, this, std::placeholders::_1)); int threadId = ptr->getId(); // The copy construction and assignment of unique_ptr are deleted, so ptr should be converted to an rvalue threads_.emplace(threadId, std::move(ptr)); // start the thread threads_[threadId]->start(); curThreadSize_++; idleThreadSize_++; } // Return the Result object of the task return Result(sp); }
Worker thread running function modification
In cached
mode, many threads may be created, but if the idle time exceeds 60s, the redundant threads should be recycled (but the number of threads must be >= initThreadSize_), that is:
if current time – last thread execution time > 60s then recycle thread
Judgment once per second, need to distinguish between timeout return and task to return, use lock + double judgment
And when the thread pool is closed, all threads must finish the task at hand before ending the thread, that is, while (taskQue_.size() == 0)
to judge Whether the thread pool is terminated
// The threadid parameter is Thread::threadId_, which is a custom object number // All task consumption tasks in the thread pool void ThreadPool::threadFunc(int threadid) {<!-- --> auto lastTime = std::chrono::high_resolution_clock().now(); \t // All tasks must be executed before the thread pool can reclaim all thread resources for (;;) {<!-- --> std::shared_ptr<Task> task; {<!-- --> // acquire the lock std::unique_lock<std::mutex> lock(taskQueMtx_); std::cout << "tid:" << std::this_thread::get_id() << "trying to get task..." << std::endl; \t\t\t // no task processing while (taskQue_. size() == 0) {<!-- --> // The thread pool ends, and thread resources are recycled if (!isPoolRunning_) {<!-- --> threads_.erase(threadid); std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl; exitCond_.notify_all(); return; } if (poolMode_ == PoolMode::MODE_CACHED) {<!-- --> // Timeout, here is used to detect whether the idle time of the current thread meets the deletion condition every 1s if(std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1))) {<!-- --> auto now = std::chrono::high_resolution_clock().now(); auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime); if (dur.count() >= THREAD_MAX_IDLE_TIME & amp; & amp; curThreadSize_ > initThreadSize_) {<!-- --> // Recycle the current thread // Record the modification of variables related to the number of threads // Delete the thread object from the thread list container // threadid => Thread object => erase threads_.erase(threadid); // Note that it is not to delete std::this_thread::get_id() curThreadSize_--; idleThreadSize_--; // The thread here must be idle, because there is no task at all std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl; return; } } } else // fixed mode {<!-- --> notEmpty_.wait(lock); } } idleThreadSize_--; // When executing tasks, idle thread -1 std::cout << "tid:" << std::this_thread::get_id() << "get task successfully..." << std::endl; // Take a task from the task queue task = taskQue_. front(); taskQue_.pop(); taskSize_--; // If there are still remaining tasks, continue to notify other threads to perform tasks if (taskQue_. size() > 0) {<!-- --> notEmpty_.notify_all(); } // Take out a task to notify the producer that the production task can be performed notFull_.notify_all(); } // Only need to lock taskQue_ and related critical resource operations, no need to lock for execution // The current thread is responsible for executing this task if (task != nullptr) {<!-- --> //task->run(); // Execute the task, and give the task's return value setVal method to Result task->exec(); } // idle thread + 1 after task execution idleThreadSize_++; auto lastTime = std::chrono::high_resolution_clock().now(); } }
Thread pool resource recycling
Thread pool destruction sets the running state to false
Notify all consumers (each worker thread Slave) that they can exit if they detect that the running status is false
Note: submitTask is called by the main thread, the thread used to assign tasks (Master)
ThreadPool::~ThreadPool() {<!-- --> isPoolRunning_ = false; // Wait for all threads in the thread pool to return, two states: blocked or executing tasks std::unique_lock<std::mutex> lock(taskQueMtx_); notEmpty_.notify_all(); // // wait for the thread to be destroyed exitCond_.wait(lock, [ & amp;]()->bool {<!-- --> return threads_.size() == 0; }); }
The initial number of threads can be given by the number of hardware concurrency (number of CPU cores) provided by the library function, that is,
void start(int initThreadSize = std::thread::hardware_concurrency());
Test
Set the initial number of threads to 4, submit 6 tasks, the maximum idle time is 6s (recycle if it exceeds), the mode is MODE_CACHED
, the test code is as follows:
#include <iostream> #include <chrono> #include "threadpool.h" using uLong = unsigned long long; class MyTask : public Task {<!-- --> public: MyTask(uLong /*int*/ begin, uLong /*int*/ end) : begin_(begin) , end_(end) {<!-- -->} Any run() {<!-- --> std::cout << "tid:" << std::this_thread::get_id() << "begin!" << std::endl; //std::this_thread::sleep_for(std::chrono::seconds(5)); uLong sum = 0; for (uLong i = begin_; i <= end_; i ++ ) {<!-- --> sum + = i; } std::cout << "tid:" << std::this_thread::get_id() << " end!" << std::endl; \t\t return sum; } private: uLong /*int*/ begin_; uLong /*int*/ end_; }; int main() {<!-- --> \t #if 1 ThreadPool pool; // Set to scalable mode pool.setMode(PoolMode::MODE_CACHED); pool. start(4); \t Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000000)); Result res2 = pool.submitTask(std::make_shared<MyTask>(1000000001, 2000000000)); Result res3 = pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000)); pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000)); pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000)); pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000)); \t uLong sum1 = res1.get().cast_<uLong>(); uLong sum2 = res2.get().cast_<uLong>(); uLong sum3 = res3.get().cast_<uLong>(); // Master - Slave threading model std::cout << (sum1 + sum2 + sum3) << std::endl; \t getchar(); #endif return 0; }
The experimental results are as follows:
It can be seen that 2 additional threads can be dynamically created, and the threads can be recycled when the timeout expires, and all threads can be recycled normally when exiting.