#include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <cstring> #include <chrono> // Custom message structure template <typename CallbackType> struct CustomMessage { CallbackType callback; void* data; size_t length; CustomMessage(CallbackType callback, void* data, size_t length) : callback(callback), data(data), length(length) {} CustomMessage() = default; // Add default constructor }; // message queue template <typename CallbackType> class MessageQueue { public: void push(const CustomMessage<CallbackType> & amp; message) { std::unique_lock<std::mutex> lock(mutex); while (queue.size() >= maxQueueSize) { //Queue is full, waiting notFull.wait(lock); } queue.push(message); lock.unlock(); notEmpty.notify_one(); // Notify observers that the queue is not empty } bool pop(CustomMessage<CallbackType> & amp; message) { std::unique_lock<std::mutex> lock(mutex); while (queue.empty() & amp; & amp; !finished) { // The queue is empty and unfinished, waiting notEmpty.wait(lock); } if (!queue.empty()) { message = queue.front(); queue.pop(); lock.unlock(); notFull.notify_one(); // Notify the publisher that the queue is not full return true; } return false; } void setFinished() { std::unique_lock<std::mutex> lock(mutex); finished = true; lock.unlock(); notEmpty.notify_all(); // Wake up waiting observer threads } private: std::queue<CustomMessage<CallbackType>> queue; const size_t maxQueueSize = 5; // Maximum queue capacity bool finished = false; std::mutex mutex; std::condition_variable notEmpty; std::condition_variable notFull; }; // abstract observer template <typename CallbackType> class Observer { public: virtual void update(const CustomMessage<CallbackType> & amp; message) = 0; }; // specific observer template <typename CallbackType> class ConcreteObserver : public Observer<CallbackType> { public: ConcreteObserver(const std::string & amp; name, MessageQueue<CallbackType> & amp; messageQueue) : name(name), messageQueue(messageQueue) {} void update(const CustomMessage<CallbackType> & amp; message) override { std::this_thread::sleep_for(std::chrono::seconds(3)); std::cout << name << " Received message, length: " << message.length << std::endl; // Process the message message.callback(message.data, message.length); std::this_thread::sleep_for(std::chrono::seconds(3)); } void start() { while (true) { CustomMessage<CallbackType> message; if (messageQueue.pop(message)) { update(message); } else { break; //queue completed } } } private: std::string name; MessageQueue<CallbackType> & amp; messageQueue; }; //Specific publisher template <typename CallbackType> class Publisher { public: Publisher(MessageQueue<CallbackType> & amp; messageQueue) : messageQueue(messageQueue) {} void publishMessages(const std::vector<CustomMessage<CallbackType>> & amp; messages) { for (const CustomMessage<CallbackType> & amp; message : messages) { messageQueue.push(message); } messageQueue.setFinished(); } private: MessageQueue<CallbackType> & amp; messageQueue; }; int main() { MessageQueue<std::function<void(void*, size_t)>> messageQueue; Publisher<std::function<void(void*, size_t)>> publisher(messageQueue); ConcreteObserver<std::function<void(void*, size_t)>> observer1("Observer1", messageQueue); ConcreteObserver<std::function<void(void*, size_t)>> observer2("Observer2", messageQueue); //Example callback function auto callback = [](void* data, size_t length) { char* messageData = static_cast<char*>(data); std::cout << "Callback function processes message data: " << std::string(messageData, length) << std::endl; }; std::vector<CustomMessage<std::function<void(void*, size_t)>>> messages; char messageData[] = "custom message"; CustomMessage<std::function<void(void*, size_t)>> customMessage(callback, messageData, strlen(messageData)); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); std::thread([ & amp;]() { publisher.publishMessages(messages); }).detach(); std::thread observerThread1( & amp;ConcreteObserver<std::function<void(void*, size_t)>>::start, & amp;observer1); std::thread observerThread2( & amp;ConcreteObserver<std::function<void(void*, size_t)>>::start, & amp;observer2); observerThread1.join(); observerThread2.join(); return 0; }
#include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <cstring> #include <chrono> #include <algorithm> // Custom message structure template <typename CallbackType> struct CustomMessage { CallbackType callback; void* data; size_t length; CustomMessage(CallbackType callback, void* data, size_t length) : callback(callback), data(data), length(length) {} CustomMessage() = default; }; // message queue template <typename CallbackType> class MessageQueue { public: void push(const CustomMessage<CallbackType> & amp; message) { std::unique_lock<std::mutex> lock(mutex); while (queue.size() >= maxQueueSize) { notFull.wait(lock); } queue.push(message); lock.unlock(); notEmpty.notify_one(); } bool pop(CustomMessage<CallbackType> & amp; message) { std::unique_lock<std::mutex> lock(mutex); while (queue.empty() & amp; & amp; !finished) { notEmpty.wait(lock); } if (!queue.empty()) { message = queue.front(); queue.pop(); lock.unlock(); notFull.notify_one(); return true; } return false; } void setFinished() { std::unique_lock<std::mutex> lock(mutex); finished = true; lock.unlock(); notEmpty.notify_all(); } private: std::queue<CustomMessage<CallbackType>> queue; const size_t maxQueueSize = 5; bool finished = false; std::mutex mutex; std::condition_variable notEmpty; std::condition_variable notFull; }; // abstract observer template <typename CallbackType> class Observer { public: virtual void update(const CustomMessage<CallbackType> & amp; message) = 0; }; // specific observer template <typename CallbackType> class ConcreteObserver : public Observer<CallbackType> { public: ConcreteObserver(const std::string & name) : name(name) {} void update(const CustomMessage<CallbackType> & amp; message) override { std::cout << name << " Received message, length: " << message.length << std::endl; message.callback(message.data, message.length); } private: std::string name; }; //Specific publisher template <typename CallbackType> class Publisher { public: Publisher(MessageQueue<CallbackType> & amp; messageQueue) : messageQueue(messageQueue) {} void publishMessages(const std::vector<CustomMessage<CallbackType>> & amp; messages) { for (const CustomMessage<CallbackType> & amp; message : messages) { messageQueue.push(message); } messageQueue.setFinished(); } private: MessageQueue<CallbackType> & amp; messageQueue; }; // specific topic template <typename CallbackType> class ConcreteSubject { public: void addObserver(Observer<CallbackType>* observer) { observers.push_back(observer); } void removeObserver(Observer<CallbackType>* observer) { auto it = std::find(observers.begin(), observers.end(), observer); if (it != observers.end()) { observers.erase(it); } } void notifyObservers() { while (true) { CustomMessage<CallbackType> message; if (messageQueue.pop(message)) { for (Observer<CallbackType>* observer : observers) { observer->update(message); } } else { break; } } } void startObservers() { for (Observer<CallbackType>* observer : observers) { observerThreads.push_back(std::thread( & amp;ConcreteSubject::runObserver, this, observer)); } } void joinObservers() { for (std::thread & amp; thread : observerThreads) { thread.join(); } } void runObserver(Observer<CallbackType>* observer) { // Observer thread waits for messages CustomMessage<CallbackType> message; while (messageQueue.pop(message)) { observer->update(message); } } MessageQueue<CallbackType> & amp; getMessageQueue() { return messageQueue; } private: std::vector<Observer<CallbackType>*> observers; MessageQueue<CallbackType> messageQueue; std::vector<std::thread> observerThreads; }; int main() { ConcreteSubject<std::function<void(void*, size_t)>> subject; ConcreteObserver<std::function<void(void*, size_t)>> observer1("Observer 1"); ConcreteObserver<std::function<void(void*, size_t)>> observer2("Observer 2"); subject.addObserver( & amp;observer1); subject.addObserver( & amp;observer2); char messageData[] = "custom message"; // std::thread([ & amp;]() { auto callback = [](void* data, size_t length) { char* messageData = static_cast<char*>(data); std::cout << "Callback function processes message data: " << std::string(messageData, length) << std::endl; }; CustomMessage<std::function<void(void*, size_t)>> customMessage(callback, messageData, strlen(messageData)); std::vector<CustomMessage<std::function<void(void*, size_t)>>> messages; messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); messages.push_back(customMessage); Publisher<std::function<void(void*, size_t)>> publisher(subject.getMessageQueue()); publisher.publishMessages(messages); }).detach(); subject.startObservers(); subject.joinObservers(); return 0; }