Publish and subscribe model implemented based on fixed-length blocking message queue [c++]

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <cstring>
#include <chrono>

// Custom message structure
template <typename CallbackType>
struct CustomMessage {
    CallbackType callback;
    void* data;
    size_t length;

    CustomMessage(CallbackType callback, void* data, size_t length)
        : callback(callback), data(data), length(length) {}

    CustomMessage() = default; // Add default constructor
};

// message queue
template <typename CallbackType>
class MessageQueue {
public:
    void push(const CustomMessage<CallbackType> & amp; message) {
        std::unique_lock<std::mutex> lock(mutex);
        while (queue.size() >= maxQueueSize) {
            //Queue is full, waiting
            notFull.wait(lock);
        }
        queue.push(message);
        lock.unlock();
        notEmpty.notify_one(); // Notify observers that the queue is not empty
    }

    bool pop(CustomMessage<CallbackType> & amp; message) {
        std::unique_lock<std::mutex> lock(mutex);
        while (queue.empty() & amp; & amp; !finished) {
            // The queue is empty and unfinished, waiting
            notEmpty.wait(lock);
        }
        if (!queue.empty()) {
            message = queue.front();
            queue.pop();
            lock.unlock();
            notFull.notify_one(); // Notify the publisher that the queue is not full
            return true;
        }
        return false;
    }

    void setFinished() {
        std::unique_lock<std::mutex> lock(mutex);
        finished = true;
        lock.unlock();
        notEmpty.notify_all(); // Wake up waiting observer threads
    }

private:
    std::queue<CustomMessage<CallbackType>> queue;
    const size_t maxQueueSize = 5; // Maximum queue capacity
    bool finished = false;
    std::mutex mutex;
    std::condition_variable notEmpty;
    std::condition_variable notFull;
};

// abstract observer
template <typename CallbackType>
class Observer {
public:
    virtual void update(const CustomMessage<CallbackType> & amp; message) = 0;
};

// specific observer
template <typename CallbackType>
class ConcreteObserver : public Observer<CallbackType> {
public:
    ConcreteObserver(const std::string & amp; name, MessageQueue<CallbackType> & amp; messageQueue)
        : name(name), messageQueue(messageQueue) {}

    void update(const CustomMessage<CallbackType> & amp; message) override {
        std::this_thread::sleep_for(std::chrono::seconds(3));
        std::cout << name << " Received message, length: " << message.length << std::endl;
        // Process the message
        message.callback(message.data, message.length);
        std::this_thread::sleep_for(std::chrono::seconds(3));

    }

    void start() {
        while (true) {
            CustomMessage<CallbackType> message;
            if (messageQueue.pop(message)) {
                update(message);
            } else {
                break; //queue completed
            }
        }
    }

private:
    std::string name;
    MessageQueue<CallbackType> & amp; messageQueue;
};

//Specific publisher
template <typename CallbackType>
class Publisher {
public:
    Publisher(MessageQueue<CallbackType> & amp; messageQueue) : messageQueue(messageQueue) {}

    void publishMessages(const std::vector<CustomMessage<CallbackType>> & amp; messages) {
        for (const CustomMessage<CallbackType> & amp; message : messages) {
            messageQueue.push(message);
        }
        messageQueue.setFinished();
    }

private:
    MessageQueue<CallbackType> & amp; messageQueue;
};

int main() {
    MessageQueue<std::function<void(void*, size_t)>> messageQueue;
    Publisher<std::function<void(void*, size_t)>> publisher(messageQueue);

    ConcreteObserver<std::function<void(void*, size_t)>> observer1("Observer1", messageQueue);
    ConcreteObserver<std::function<void(void*, size_t)>> observer2("Observer2", messageQueue);

    //Example callback function
    auto callback = [](void* data, size_t length) {
        char* messageData = static_cast<char*>(data);
        std::cout << "Callback function processes message data: " << std::string(messageData, length) << std::endl;
    };
    std::vector<CustomMessage<std::function<void(void*, size_t)>>> messages;
    char messageData[] = "custom message";
    CustomMessage<std::function<void(void*, size_t)>> customMessage(callback, messageData, strlen(messageData));
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);

    std::thread([ & amp;]() {
        publisher.publishMessages(messages);
    }).detach();

    std::thread observerThread1( & amp;ConcreteObserver<std::function<void(void*, size_t)>>::start, & amp;observer1);
    std::thread observerThread2( & amp;ConcreteObserver<std::function<void(void*, size_t)>>::start, & amp;observer2);

    observerThread1.join();
    observerThread2.join();

    return 0;
}
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <cstring>
#include <chrono>
#include <algorithm>

// Custom message structure
template <typename CallbackType>
struct CustomMessage {
    CallbackType callback;
    void* data;
    size_t length;

    CustomMessage(CallbackType callback, void* data, size_t length)
        : callback(callback), data(data), length(length) {}

    CustomMessage() = default;
};

// message queue
template <typename CallbackType>
class MessageQueue {
public:
    void push(const CustomMessage<CallbackType> & amp; message) {
        std::unique_lock<std::mutex> lock(mutex);
        while (queue.size() >= maxQueueSize) {
            notFull.wait(lock);
        }
        queue.push(message);
        lock.unlock();
        notEmpty.notify_one();
    }

    bool pop(CustomMessage<CallbackType> & amp; message) {
        std::unique_lock<std::mutex> lock(mutex);
        while (queue.empty() & amp; & amp; !finished) {
            notEmpty.wait(lock);
        }
        if (!queue.empty()) {
            message = queue.front();
            queue.pop();
            lock.unlock();
            notFull.notify_one();
            return true;
        }
        return false;
    }

    void setFinished() {
        std::unique_lock<std::mutex> lock(mutex);
        finished = true;
        lock.unlock();
        notEmpty.notify_all();
    }

private:
    std::queue<CustomMessage<CallbackType>> queue;
    const size_t maxQueueSize = 5;
    bool finished = false;
    std::mutex mutex;
    std::condition_variable notEmpty;
    std::condition_variable notFull;
};

// abstract observer
template <typename CallbackType>
class Observer {
public:
    virtual void update(const CustomMessage<CallbackType> & amp; message) = 0;
};

// specific observer
template <typename CallbackType>
class ConcreteObserver : public Observer<CallbackType> {
public:
    ConcreteObserver(const std::string & name)
        : name(name) {}

    void update(const CustomMessage<CallbackType> & amp; message) override {
        std::cout << name << " Received message, length: " << message.length << std::endl;
        message.callback(message.data, message.length);
    }

private:
    std::string name;
};

//Specific publisher
template <typename CallbackType>
class Publisher {
public:
    Publisher(MessageQueue<CallbackType> & amp; messageQueue) : messageQueue(messageQueue) {}

    void publishMessages(const std::vector<CustomMessage<CallbackType>> & amp; messages) {
        for (const CustomMessage<CallbackType> & amp; message : messages) {
            messageQueue.push(message);
        }
        messageQueue.setFinished();
    }

private:
    MessageQueue<CallbackType> & amp; messageQueue;
};

// specific topic
template <typename CallbackType>
class ConcreteSubject {
public:
    void addObserver(Observer<CallbackType>* observer) {
        observers.push_back(observer);
    }

    void removeObserver(Observer<CallbackType>* observer) {
        auto it = std::find(observers.begin(), observers.end(), observer);
        if (it != observers.end()) {
            observers.erase(it);
        }
    }

    void notifyObservers() {
        while (true) {
            CustomMessage<CallbackType> message;
            if (messageQueue.pop(message)) {
                for (Observer<CallbackType>* observer : observers) {
                    observer->update(message);
                }
            } else {
                break;
            }
        }
    }

    void startObservers() {
        for (Observer<CallbackType>* observer : observers) {
            observerThreads.push_back(std::thread( & amp;ConcreteSubject::runObserver, this, observer));
        }
    }

    void joinObservers() {
        for (std::thread & amp; thread : observerThreads) {
            thread.join();
        }
    }

    void runObserver(Observer<CallbackType>* observer) {
        // Observer thread waits for messages
        CustomMessage<CallbackType> message;
        while (messageQueue.pop(message)) {
            observer->update(message);
        }
    }

    MessageQueue<CallbackType> & amp; getMessageQueue() { return messageQueue; }

private:
    std::vector<Observer<CallbackType>*> observers;
    MessageQueue<CallbackType> messageQueue;
    std::vector<std::thread> observerThreads;
};

int main() {
    ConcreteSubject<std::function<void(void*, size_t)>> subject;

    ConcreteObserver<std::function<void(void*, size_t)>> observer1("Observer 1");
    ConcreteObserver<std::function<void(void*, size_t)>> observer2("Observer 2");

    subject.addObserver( & amp;observer1);
    subject.addObserver( & amp;observer2);



    char messageData[] = "custom message"; // 
    std::thread([ & amp;]() {
    auto callback = [](void* data, size_t length) {
        char* messageData = static_cast<char*>(data);
        std::cout << "Callback function processes message data: " << std::string(messageData, length) << std::endl;
    };
    CustomMessage<std::function<void(void*, size_t)>> customMessage(callback, messageData, strlen(messageData));
    std::vector<CustomMessage<std::function<void(void*, size_t)>>> messages;
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
    messages.push_back(customMessage);
        Publisher<std::function<void(void*, size_t)>> publisher(subject.getMessageQueue());
        publisher.publishMessages(messages);
    }).detach();

    subject.startObservers();
    subject.joinObservers();

    return 0;
}