C++ uses thread pool to simulate asynchronous event processing mechanism

There are asynchronous event processing mechanisms in many C++ frameworks, which makes us often confused and difficult to understand when looking at the source code. The programming routines contained in it may be some mature technologies that we are not familiar with, such as WebRTC. It is similar to Qt’s signal and slot mechanism, thread event processing, or using system asynchronous IO, etc. If you don’t understand these routines, it will be difficult to understand the code. This blog will try to use C++ thread pool to implement asynchronous Event handling mechanism.

Basic implementation of asynchronous event processing mechanism

C++ can use std::future and std::promise to implement asynchronous operations. However, in order to implement an asynchronous event binding framework, we need a more complex design. Below is a simple example of how to implement an asynchronous event handler.

First, define an event handler class that will receive and process events:

class EventHandler {<!-- -->
public:
    virtual ~EventHandler() = default;
    virtual void handleEvent(int eventID) = 0;
};

Then we need to create an event dispatcher that will call the event handler asynchronously:

/*

Event registration, distribution

*/

#pragma once

#include "EventHandler.hpp"
#include <map>
#include <thread>
#include <future>
#include <functional>
#include <memory>

class EventDispatcher {<!-- -->
public:
    //Register event handler
    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {<!-- -->
        handlers[eventID] = handler;
    }

    //Asynchronous event dispatch function
    void postEvent(int eventID) {<!-- -->
        auto it = handlers.find(eventID);
        if (it != handlers.end()) {<!-- -->
            std::thread eventThread( & amp;EventDispatcher::dispatchEvent, this, it->second, eventID);
            eventThread.detach();
        }
    }

private:
    //Event dispatch function
    void dispatchEvent(std::shared_ptr<EventHandler> handler, int eventID) {<!-- -->
        handler->handleEvent(eventID);
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> handlers; //Storage event, int event id, std::shared_ptr<EventHandler> event handler
};

In this example, the postEvent method of the EventDispatcher class receives an event ID and calls the corresponding event handler in a new thread. This enables asynchronous processing of events.

Then, you can create one or more processor classes, such as the following print event handler PrintEventHandler, which implements the EventHandler interface,

/*

specific event handler

*/

#include "EventHandler.hpp"
#include <iostream>

using namespace std;

class PrintEventHandler : public EventHandler {<!-- -->
public:
    void handleEvent(int eventID) override {<!-- -->
        std::cout << "Handling event " << eventID << std::endl;
    }
};

Then register in the main function:

/*

C++ asynchronous event framework demo01

*/


#include <iostream>
#include <memory>
#include <thread>
#include <chrono>
#include "EventDispatcher.hpp"
#include "PrintEventHandler.hpp"

int main() {<!-- -->
    EventDispatcher dispatcher;
    
    std::shared_ptr<EventHandler> printHandler = std::make_shared<PrintEventHandler>();
    dispatcher.registerHandler(1, printHandler);

    dispatcher.postEvent(1);

    // Sleep main thread to let the event thread finish.
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

operation result:

Handling event 1

The code is organized as follows. Those who are interested can write and implement it by themselves:

cmake script

#[[

    Compilation method
    cmake -S . -B build
    cd build
    make
    ./demo01

]]

cmake_minimum_required(VERSION 3.20)

project(demo01)

set(INCLUDE_PATH1 "./")

#Add header file directory
include_directories(
    ${<!-- -->INCLUDE_PATH1}
)

#Add subdirectory src
aux_source_directory("./" SRC)

add_executable(demo01 ${<!-- -->SRC})

This implementation is very basic and does not take into account thread safety issues, exception handling, etc. In actual projects, you need more complex designs and use more advanced concurrent programming techniques, such as thread pools, task queues, mutex locks, etc.

Add thread pool and task queue

If you want a more complex design, including thread pools, task queues, mutex locks, etc., you can consider using the following design. The following example uses C++17’s std::async and std::future to implement thread pools and task queues.

First, we need a thread-safe task queue:

#pragma once

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue {<!-- -->
public:
    ThreadSafeQueue() = default;
    ThreadSafeQueue(const ThreadSafeQueue<T> & amp;) = delete;
    ThreadSafeQueue & amp; operator=(const ThreadSafeQueue<T> & amp;) = delete;

    void push(T value) {<!-- -->
        std::lock_guard<std::mutex> lock(mMutex);
        mQueue.push(std::move(value));
        mCondition.notify_one();
    }

    bool try_pop(T & amp; value) {<!-- -->
        std::lock_guard<std::mutex> lock(mMutex);
        if (mQueue.empty()) {<!-- -->
            return false;
        }
        
        value = std::move(mQueue.front());
        mQueue.pop();
        return true;
    }

    void wait_and_pop(T & amp; value) {<!-- -->
        std::unique_lock<std::mutex> lock(mMutex);
        mCondition.wait(lock, [this](){<!-- --> return !mQueue.empty(); });
        value = std::move(mQueue.front());
        mQueue.pop();
    }

private:
    std::queue<T> mQueue;
    std::mutex mMutex;
    std::condition_variable mCondition;
};

Then, we need a thread pool to handle these tasks:

#pragma once

#include "ThreadSafeQueue.hpp"
#include <vector>
#include <future>

class ThreadPool {<!-- -->
public:
    ThreadPool(size_t numThreads) {<!-- -->
        start(numThreads);
    }

    ~ThreadPool() {<!-- -->
        stop();
    }

    template<typename T>
    void enqueue(T task) {<!-- -->
        mTasks.push(std::make_shared<std::packaged_task<void()>>(std::move(task)));
    }

private:
    std::vector<std::thread> mThreads;
    ThreadSafeQueue<std::shared_ptr<std::packaged_task<void()>>> mTasks;
    std::atomic<bool> mContinue {<!-- --> true };

    void start(size_t numThreads) {<!-- -->
        for (auto i = 0u; i < numThreads; + + i) {<!-- -->
            mThreads.emplace_back([=] {<!-- -->
                while (mContinue) {<!-- -->
                    std::shared_ptr<std::packaged_task<void()>> task;
                    if (mTasks.try_pop(task)) {<!-- -->
                        (*task)();
                    } else {<!-- -->
                        std::this_thread::yield();
                    }
                }
            });
        }
    }

    void stop() noexcept {<!-- -->
        mContinue = false;
        for (auto & amp;thread : mThreads) {<!-- -->
            thread.join();
        }
    }
};

We can then use a thread pool in EventDispatcher to handle events asynchronously:

#pragma once
#include "ThreadPool.hpp"
#include <mutex>
#include <map>
#include <memory>
#include "EventHandler.hpp"

class EventDispatcher {<!-- -->
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {<!-- -->}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {<!-- -->
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void postEvent(int eventID) {<!-- -->
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {<!-- -->
            mThreadPool.enqueue([handler = it->second, eventID]() {<!-- -->
                handler->handleEvent(eventID);
            });
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

This implementation is now thread-safe and uses a thread pool to handle events. For exception handling, you can do it inside the handleEvent method of EventHandler.

Add exception handling

How to handle exceptions in EventDispatcher?
To handle exceptions in EventDispatcher, we can add a try/catch block during the execution of the task to capture and handle possible exceptions. Here’s an example:

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            mThreadPool.enqueue([handler = it->second, eventID]() {
                try {
                    handler->handleEvent(eventID);
                } catch (const std::exception & amp;ex) {
                    std::cerr << "Exception occurred while handling event " << eventID
                              << ": " << ex.what() << '\\
';
                } catch (...) {
                    std::cerr << "Unknown exception occurred while handling event " << eventID << '\\
';
                }
            });
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

In this example, if the handleEvent method throws an exception, then we will print out an error message on the console. How you handle these exceptions depends on the specific needs of your application.

Please note that this method only handles exceptions thrown in the handleEvent method. If an exception is thrown elsewhere (such as in the ThreadPool constructor or the enqueue method), you may need additional code to handle these situations.

Register multiple events

In our event dispatcher design, registering multiple events is very simple. You just need to create different event handlers and register them with the event dispatcher using different event IDs. Here is an example:

class PrintEventHandler : public EventHandler {
public:
    PrintEventHandler(int id) : id(id) {}
    void handleEvent(int eventID) override {
        std::cout << "Handler " << id << " handling event " << eventID << std::endl;
    }
private:
    int id;
};

int main() {
    EventDispatcher dispatcher(4); // Create an event dispatcher with 4 threads

    std::shared_ptr<EventHandler> handler1 = std::make_shared<PrintEventHandler>(1);
    dispatcher.registerHandler(1, handler1);

    std::shared_ptr<EventHandler> handler2 = std::make_shared<PrintEventHandler>(2);
    dispatcher.registerHandler(2, handler2);

    dispatcher.postEvent(1);
    dispatcher.postEvent(2);

    // Sleep main thread to let the event threads finish
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

In this example, we create two PrintEventHandler objects, each with a unique ID. We then register these two handlers into EventDispatcher using different event IDs. When we publish an event, the corresponding handler will be called.

This is how to register multiple events in our asynchronous event binding framework.

How to cancel registration

To unregister an event handler from an event dispatcher, you can add a new method to the EventDispatcher class, such as unregisterHandler, for mapping from the handler Delete the specified event ID. Here is an example of how to implement this function:

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void unregisterHandler(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers.erase(eventID);
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            mThreadPool.enqueue([handler = it->second, eventID]() {
                try {
                    handler->handleEvent(eventID);
                } catch (const std::exception & amp;ex) {
                    std::cerr << "Exception occurred while handling event " << eventID
                              << ": " << ex.what() << '\\
';
                } catch (...) {
                    std::cerr << "Unknown exception occurred while handling event " << eventID << '\\
';
                }
            });
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

This unregisterHandler function will remove the event handler associated with the given event ID. Note that this function does not stop any threads that are already handling the event. If you want to cancel ongoing event processing, you may need a more complex design, such as using std::future and std::promise to control thread execution .

How to stop an ongoing event

To stop ongoing event processing, we need a more complex design, which may include using std::future and std::promise to control the execution of the thread. In this design, whenever an event is posted, we will create a std::promise and store the corresponding std::future somewhere , so that we can stop event processing later if needed.

Be aware, however, that by design of C++, there is no simple and safe way to force stop a running thread, as this may lead to resource leaks or other undefined behavior. Therefore, it is more common to have the event handler periodically check a “stop flag” and then gracefully stop execution when that flag is checked. Here is a simple example demonstrating how to implement this design:

class StoppableEvent {
public:
    StoppableEvent(std::future<void> future, std::function<void()> func)
        : mFuture(std::move(future)), mFunc(std::move(func)) {}

    void operator()() {
        while(mFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::timeout) {
            mFunc();
        }
    }

private:
    std::future<void> mFuture;
    std::function<void()> mFunc;
};

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            std::promise<void> stopSignal;
            auto stopFuture = stopSignal.get_future();
            mStopSignals[eventID] = std::move(stopSignal);
            mThreadPool.enqueue(StoppableEvent(std::move(stopFuture), [handler = it->second, eventID]() {
                handler->handleEvent(eventID);
            }));
        }
    }

    void stopEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mStopSignals.find(eventID);
        if (it != mStopSignals.end()) {
            it->second.set_value();
            mStopSignals.erase(it);
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    std::map<int, std::promise<void>> mStopSignals;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

In this example, we define a StoppableEvent class that combines a std::future and a function. When operator() is called, it will periodically check future. If the status of future is not timeout, then Stop executing the function.

Then, when we publish an event in EventDispatcher, we will create a new std::promise and corresponding std::future code>, and pass this future and the event handler’s handleEvent method to StoppableEvent. We also store the promise in a map so that we can later signal a stop by calling set_value.

Finally, we added a stopEvent method that will look for the promise associated with a given event ID and emit it by calling set_value Stop signal. It will then remove this promise from the map since we no longer need it.

This is a basic example, you may need to modify and extend it according to your specific needs. Note that this design assumes that the event handler’s handleEvent method will be called multiple times, and each call may be interrupted. If your event handler only executes a long-running task once, this design may not be suitable.

The above is a simple asynchronous event processing demo. During project development, it needs to be adjusted and improved according to specific business needs.