There are asynchronous event processing mechanisms in many C++ frameworks, which makes us often confused and difficult to understand when looking at the source code. The programming routines contained in it may be some mature technologies that we are not familiar with, such as WebRTC. It is similar to Qt’s signal and slot mechanism, thread event processing, or using system asynchronous IO, etc. If you don’t understand these routines, it will be difficult to understand the code. This blog will try to use C++ thread pool to implement asynchronous Event handling mechanism.
Basic implementation of asynchronous event processing mechanism
C++ can use std::future and std::promise to implement asynchronous operations. However, in order to implement an asynchronous event binding framework, we need a more complex design. Below is a simple example of how to implement an asynchronous event handler.
First, define an event handler class that will receive and process events:
class EventHandler {<!-- --> public: virtual ~EventHandler() = default; virtual void handleEvent(int eventID) = 0; };
Then we need to create an event dispatcher that will call the event handler asynchronously:
/* Event registration, distribution */ #pragma once #include "EventHandler.hpp" #include <map> #include <thread> #include <future> #include <functional> #include <memory> class EventDispatcher {<!-- --> public: //Register event handler void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {<!-- --> handlers[eventID] = handler; } //Asynchronous event dispatch function void postEvent(int eventID) {<!-- --> auto it = handlers.find(eventID); if (it != handlers.end()) {<!-- --> std::thread eventThread( & amp;EventDispatcher::dispatchEvent, this, it->second, eventID); eventThread.detach(); } } private: //Event dispatch function void dispatchEvent(std::shared_ptr<EventHandler> handler, int eventID) {<!-- --> handler->handleEvent(eventID); } private: std::map<int, std::shared_ptr<EventHandler>> handlers; //Storage event, int event id, std::shared_ptr<EventHandler> event handler };
In this example, the postEvent method of the EventDispatcher class receives an event ID and calls the corresponding event handler in a new thread. This enables asynchronous processing of events.
Then, you can create one or more processor classes, such as the following print event handler PrintEventHandler, which implements the EventHandler interface,
/* specific event handler */ #include "EventHandler.hpp" #include <iostream> using namespace std; class PrintEventHandler : public EventHandler {<!-- --> public: void handleEvent(int eventID) override {<!-- --> std::cout << "Handling event " << eventID << std::endl; } };
Then register in the main function:
/* C++ asynchronous event framework demo01 */ #include <iostream> #include <memory> #include <thread> #include <chrono> #include "EventDispatcher.hpp" #include "PrintEventHandler.hpp" int main() {<!-- --> EventDispatcher dispatcher; std::shared_ptr<EventHandler> printHandler = std::make_shared<PrintEventHandler>(); dispatcher.registerHandler(1, printHandler); dispatcher.postEvent(1); // Sleep main thread to let the event thread finish. std::this_thread::sleep_for(std::chrono::seconds(1)); return 0; }
operation result:
Handling event 1
The code is organized as follows. Those who are interested can write and implement it by themselves:
cmake script
#[[ Compilation method cmake -S . -B build cd build make ./demo01 ]] cmake_minimum_required(VERSION 3.20) project(demo01) set(INCLUDE_PATH1 "./") #Add header file directory include_directories( ${<!-- -->INCLUDE_PATH1} ) #Add subdirectory src aux_source_directory("./" SRC) add_executable(demo01 ${<!-- -->SRC})
This implementation is very basic and does not take into account thread safety issues, exception handling, etc. In actual projects, you need more complex designs and use more advanced concurrent programming techniques, such as thread pools, task queues, mutex locks, etc.
Add thread pool and task queue
If you want a more complex design, including thread pools, task queues, mutex locks, etc., you can consider using the following design. The following example uses C++17’s std::async and std::future to implement thread pools and task queues.
First, we need a thread-safe task queue:
#pragma once #include <queue> #include <mutex> #include <condition_variable> template <typename T> class ThreadSafeQueue {<!-- --> public: ThreadSafeQueue() = default; ThreadSafeQueue(const ThreadSafeQueue<T> & amp;) = delete; ThreadSafeQueue & amp; operator=(const ThreadSafeQueue<T> & amp;) = delete; void push(T value) {<!-- --> std::lock_guard<std::mutex> lock(mMutex); mQueue.push(std::move(value)); mCondition.notify_one(); } bool try_pop(T & amp; value) {<!-- --> std::lock_guard<std::mutex> lock(mMutex); if (mQueue.empty()) {<!-- --> return false; } value = std::move(mQueue.front()); mQueue.pop(); return true; } void wait_and_pop(T & amp; value) {<!-- --> std::unique_lock<std::mutex> lock(mMutex); mCondition.wait(lock, [this](){<!-- --> return !mQueue.empty(); }); value = std::move(mQueue.front()); mQueue.pop(); } private: std::queue<T> mQueue; std::mutex mMutex; std::condition_variable mCondition; };
Then, we need a thread pool to handle these tasks:
#pragma once #include "ThreadSafeQueue.hpp" #include <vector> #include <future> class ThreadPool {<!-- --> public: ThreadPool(size_t numThreads) {<!-- --> start(numThreads); } ~ThreadPool() {<!-- --> stop(); } template<typename T> void enqueue(T task) {<!-- --> mTasks.push(std::make_shared<std::packaged_task<void()>>(std::move(task))); } private: std::vector<std::thread> mThreads; ThreadSafeQueue<std::shared_ptr<std::packaged_task<void()>>> mTasks; std::atomic<bool> mContinue {<!-- --> true }; void start(size_t numThreads) {<!-- --> for (auto i = 0u; i < numThreads; + + i) {<!-- --> mThreads.emplace_back([=] {<!-- --> while (mContinue) {<!-- --> std::shared_ptr<std::packaged_task<void()>> task; if (mTasks.try_pop(task)) {<!-- --> (*task)(); } else {<!-- --> std::this_thread::yield(); } } }); } } void stop() noexcept {<!-- --> mContinue = false; for (auto & amp;thread : mThreads) {<!-- --> thread.join(); } } };
We can then use a thread pool in EventDispatcher to handle events asynchronously:
#pragma once #include "ThreadPool.hpp" #include <mutex> #include <map> #include <memory> #include "EventHandler.hpp" class EventDispatcher {<!-- --> public: EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {<!-- -->} void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {<!-- --> std::lock_guard<std::mutex> lock(mMutex); mHandlers[eventID] = handler; } void postEvent(int eventID) {<!-- --> std::lock_guard<std::mutex> lock(mMutex); auto it = mHandlers.find(eventID); if (it != mHandlers.end()) {<!-- --> mThreadPool.enqueue([handler = it->second, eventID]() {<!-- --> handler->handleEvent(eventID); }); } } private: std::map<int, std::shared_ptr<EventHandler>> mHandlers; ThreadPool mThreadPool; std::mutex mMutex; };
This implementation is now thread-safe and uses a thread pool to handle events. For exception handling, you can do it inside the handleEvent method of EventHandler.
Add exception handling
How to handle exceptions in EventDispatcher?
To handle exceptions in EventDispatcher
, we can add a try/catch
block during the execution of the task to capture and handle possible exceptions. Here’s an example:
class EventDispatcher { public: EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {} void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) { std::lock_guard<std::mutex> lock(mMutex); mHandlers[eventID] = handler; } void postEvent(int eventID) { std::lock_guard<std::mutex> lock(mMutex); auto it = mHandlers.find(eventID); if (it != mHandlers.end()) { mThreadPool.enqueue([handler = it->second, eventID]() { try { handler->handleEvent(eventID); } catch (const std::exception & amp;ex) { std::cerr << "Exception occurred while handling event " << eventID << ": " << ex.what() << '\\ '; } catch (...) { std::cerr << "Unknown exception occurred while handling event " << eventID << '\\ '; } }); } } private: std::map<int, std::shared_ptr<EventHandler>> mHandlers; ThreadPool mThreadPool; std::mutex mMutex; };
In this example, if the handleEvent
method throws an exception, then we will print out an error message on the console. How you handle these exceptions depends on the specific needs of your application.
Please note that this method only handles exceptions thrown in the handleEvent
method. If an exception is thrown elsewhere (such as in the ThreadPool
constructor or the enqueue
method), you may need additional code to handle these situations.
Register multiple events
In our event dispatcher design, registering multiple events is very simple. You just need to create different event handlers and register them with the event dispatcher using different event IDs. Here is an example:
class PrintEventHandler : public EventHandler { public: PrintEventHandler(int id) : id(id) {} void handleEvent(int eventID) override { std::cout << "Handler " << id << " handling event " << eventID << std::endl; } private: int id; }; int main() { EventDispatcher dispatcher(4); // Create an event dispatcher with 4 threads std::shared_ptr<EventHandler> handler1 = std::make_shared<PrintEventHandler>(1); dispatcher.registerHandler(1, handler1); std::shared_ptr<EventHandler> handler2 = std::make_shared<PrintEventHandler>(2); dispatcher.registerHandler(2, handler2); dispatcher.postEvent(1); dispatcher.postEvent(2); // Sleep main thread to let the event threads finish std::this_thread::sleep_for(std::chrono::seconds(1)); return 0; }
In this example, we create two PrintEventHandler
objects, each with a unique ID. We then register these two handlers into EventDispatcher
using different event IDs. When we publish an event, the corresponding handler will be called.
This is how to register multiple events in our asynchronous event binding framework.
How to cancel registration
To unregister an event handler from an event dispatcher, you can add a new method to the EventDispatcher
class, such as unregisterHandler
, for mapping from the handler Delete the specified event ID. Here is an example of how to implement this function:
class EventDispatcher { public: EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {} void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) { std::lock_guard<std::mutex> lock(mMutex); mHandlers[eventID] = handler; } void unregisterHandler(int eventID) { std::lock_guard<std::mutex> lock(mMutex); mHandlers.erase(eventID); } void postEvent(int eventID) { std::lock_guard<std::mutex> lock(mMutex); auto it = mHandlers.find(eventID); if (it != mHandlers.end()) { mThreadPool.enqueue([handler = it->second, eventID]() { try { handler->handleEvent(eventID); } catch (const std::exception & amp;ex) { std::cerr << "Exception occurred while handling event " << eventID << ": " << ex.what() << '\\ '; } catch (...) { std::cerr << "Unknown exception occurred while handling event " << eventID << '\\ '; } }); } } private: std::map<int, std::shared_ptr<EventHandler>> mHandlers; ThreadPool mThreadPool; std::mutex mMutex; };
This unregisterHandler
function will remove the event handler associated with the given event ID. Note that this function does not stop any threads that are already handling the event. If you want to cancel ongoing event processing, you may need a more complex design, such as using std::future
and std::promise
to control thread execution .
How to stop an ongoing event
To stop ongoing event processing, we need a more complex design, which may include using std::future
and std::promise
to control the execution of the thread. In this design, whenever an event is posted, we will create a std::promise
and store the corresponding std::future
somewhere , so that we can stop event processing later if needed.
Be aware, however, that by design of C++, there is no simple and safe way to force stop a running thread, as this may lead to resource leaks or other undefined behavior. Therefore, it is more common to have the event handler periodically check a “stop flag” and then gracefully stop execution when that flag is checked. Here is a simple example demonstrating how to implement this design:
class StoppableEvent { public: StoppableEvent(std::future<void> future, std::function<void()> func) : mFuture(std::move(future)), mFunc(std::move(func)) {} void operator()() { while(mFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::timeout) { mFunc(); } } private: std::future<void> mFuture; std::function<void()> mFunc; }; class EventDispatcher { public: EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {} void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) { std::lock_guard<std::mutex> lock(mMutex); mHandlers[eventID] = handler; } void postEvent(int eventID) { std::lock_guard<std::mutex> lock(mMutex); auto it = mHandlers.find(eventID); if (it != mHandlers.end()) { std::promise<void> stopSignal; auto stopFuture = stopSignal.get_future(); mStopSignals[eventID] = std::move(stopSignal); mThreadPool.enqueue(StoppableEvent(std::move(stopFuture), [handler = it->second, eventID]() { handler->handleEvent(eventID); })); } } void stopEvent(int eventID) { std::lock_guard<std::mutex> lock(mMutex); auto it = mStopSignals.find(eventID); if (it != mStopSignals.end()) { it->second.set_value(); mStopSignals.erase(it); } } private: std::map<int, std::shared_ptr<EventHandler>> mHandlers; std::map<int, std::promise<void>> mStopSignals; ThreadPool mThreadPool; std::mutex mMutex; };
In this example, we define a StoppableEvent
class that combines a std::future
and a function. When operator()
is called, it will periodically check future
. If the status of future
is not timeout
, then Stop executing the function.
Then, when we publish an event in EventDispatcher
, we will create a new std::promise
and corresponding std::future
code>, and pass this future
and the event handler’s handleEvent
method to StoppableEvent
. We also store the promise
in a map so that we can later signal a stop by calling set_value
.
Finally, we added a stopEvent
method that will look for the promise
associated with a given event ID and emit it by calling set_value
Stop signal. It will then remove this promise
from the map since we no longer need it.
This is a basic example, you may need to modify and extend it according to your specific needs. Note that this design assumes that the event handler’s handleEvent
method will be called multiple times, and each call may be interrupted. If your event handler only executes a long-running task once, this design may not be suitable.
The above is a simple asynchronous event processing demo. During project development, it needs to be adjusted and improved according to specific business needs.