Realization of lock-free and lock-locked ring queues in C++ wheel drag racing scene

First look at the implementation with locks.

Version with lock

circular_queue.h

// header file defense
#ifndef CIRCULAR_QUEUE_H
#define CIRCULAR_QUEUE_H

#include <mutex> // Mutex
#include <condition_variable> // condition variable

template <typename T>
class CircularQueue {<!-- -->
public:
    // Constructor, initialize member variables
    explicit CircularQueue(size_t capacity) :
        capacity_(capacity),
        size_(0),
        head_(0),
        tail_(0),
        buffer_(new T[capacity]) {<!-- -->}

    // Destructor, release buffer_ memory
    ~CircularQueue() {<!-- -->
        delete[] buffer_;
    }

    // check if the queue is empty
    bool empty() {<!-- -->
        std::unique_lock<std::mutex> lock(mutex_);
        return size_ == 0;
    }

    // Check if the queue is full
    bool full() {<!-- -->
        std::unique_lock<std::mutex> lock(mutex_);
        return size_ == capacity_;
    }

    // Get the number of elements in the queue
    size_t size() {<!-- -->
        std::unique_lock<std::mutex> lock(mutex_);
        return size_;
    }

    // Get the capacity of the queue
    size_t capacity() {<!-- -->
        return capacity_;
    }

    // Add elements to the queue, may block
    bool push(const T & amp; value, bool block = true) {<!-- -->
        std::unique_lock<std::mutex> lock(mutex_);

        if (block) {<!-- -->
            // If the queue is full, wait for the queue to be full
            while (size_ == capacity_) {<!-- -->
                not_full_.wait(lock);
            }
        } else {<!-- -->
            // If the queue is full, return false
            if (size_ == capacity_) {<!-- -->
                return false;
            }
        }

        // Add elements to the tail of the queue and update tail_ and size_
        buffer_[tail_] = value;
        tail_ = (tail_ + 1) % capacity_;
         + + size_;

        // Notify a thread waiting on the not_empty_ condition variable
        not_empty_.notify_one();

        return true;
    }

    // Add elements to the queue, may block, use rvalue reference
    bool push(T & amp; & amp; value, bool block = true) {<!-- -->
        std::unique_lock<std::mutex> lock(mutex_);

        if (block) {<!-- -->
            // If the queue is full, wait for the queue to be full
            while (size_ == capacity_) {<!-- -->
                not_full_.wait(lock);
            }
        } else {<!-- -->
            // If the queue is full, return false
            if (size_ == capacity_) {<!-- -->
                return false;
            }
        }

        // Add elements to the tail of the queue and update tail_ and size_
        buffer_[tail_] = std::move(value);
        tail_ = (tail_ + 1) % capacity_;
         + + size_;

        // Notify a thread waiting on the not_empty_ condition variable
        not_empty_.notify_one();

        return true;
    }

    // Remove elements from the queue, may block
    bool pop(T & amp; value, bool block = true) {<!-- -->
        std::unique_lock<std::mutex> lock(mutex_);

        if (block) {<!-- -->
            // If the queue is empty, the waiting queue is not empty
            while (size_ == 0) {<!-- -->
                not_empty_.wait(lock);
            }
        } else {<!-- -->
            // If the queue is empty, return false
            if (size_ == 0) {<!-- -->
                return false;
            }
        }

        // Take out the head element of the queue and update head_ and size_
        value = std::move(buffer_[head_]);
        head_ = (head_ + 1) % capacity_;
        --size_;

        // Notify a thread waiting on the not_full_ condition variable
        not_full_.notify_one();

        return true;
    }

private:
    const size_t capacity_; // queue capacity
    size_t size_; // the number of elements in the queue
    size_t head_; // queue head pointer
    size_t tail_; // queue tail pointer
    T* buffer_; // queue buffer
    std::mutex mutex_; // mutex, protection queue buffer and queue size
    std::condition_variable not_full_; // condition variable, wait when the queue is full
    std::condition_variable not_empty_; // condition variable, wait when the queue is empty
};

#endif // CIRCULAR_QUEUE_H

If the push and pop interfaces do not specify the second parameter, they are blocked by default, and you need to pay attention to this when using it.

The following is the unit test sample code for the CircularQueue class:

#include <gtest/gtest.h>
#include <thread>
  
#include "circular_queue.h"

TEST(CircularQueueTest, EmptyQueue) {<!-- -->
    CircularQueue<int> queue(10);
    ASSERT_TRUE(queue. empty());
    ASSERT_FALSE(queue. full());
    ASSERT_EQ(queue. size(), 0);
    ASSERT_EQ(queue. capacity(), 10);
}

TEST(CircularQueueTest, PushAndPop) {<!-- -->
    CircularQueue<int> queue(3);

    ASSERT_TRUE(queue. push(1));
    ASSERT_EQ(queue. size(), 1);
    ASSERT_FALSE(queue. empty());
    ASSERT_FALSE(queue. full());

    ASSERT_TRUE(queue. push(2));
    ASSERT_EQ(queue. size(), 2);
    ASSERT_FALSE(queue. empty());
    ASSERT_FALSE(queue. full());

    ASSERT_TRUE(queue. push(3));
    ASSERT_EQ(queue. size(), 3);
    ASSERT_FALSE(queue. empty());
    ASSERT_TRUE(queue. full());

    int value;
    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, 1);
    ASSERT_EQ(queue. size(), 2);
    ASSERT_FALSE(queue. empty());
    ASSERT_FALSE(queue. full());

    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, 2);
    ASSERT_EQ(queue. size(), 1);
    ASSERT_FALSE(queue. empty());
    ASSERT_FALSE(queue. full());

    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, 3);
    ASSERT_EQ(queue. size(), 0);
    ASSERT_TRUE(queue. empty());
    ASSERT_FALSE(queue. full());

    ASSERT_FALSE(queue. pop(value, false));
}

TEST(CircularQueueTest, PushAndPopWithBlocking) {<!-- -->
    CircularQueue<int> queue(2);

    std::thread t([ & amp;queue]() {<!-- -->
        int value = 0;
        queue. pop(value);
        ASSERT_EQ(value, 1);
        queue. pop(value);
        ASSERT_EQ(value, 2);
    });

    ASSERT_TRUE(queue. push(1));
    ASSERT_TRUE(queue. push(2));
    ASSERT_TRUE(queue. push(3));

    t. join();
}

TEST(CircularQueueTest, PushAndPopWithNonBlocking) {<!-- -->
    CircularQueue<int> queue(2);

    int value;
    ASSERT_TRUE(queue. push(1));
    ASSERT_TRUE(queue. push(2));
    ASSERT_FALSE(queue. push(3, false));
    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, 1);
    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, 2);
    ASSERT_FALSE(queue. pop(value, false));
}

TEST(CircularQueueTest, MovePushAndPop) {<!-- -->
    CircularQueue<std::string> queue(3);

    ASSERT_TRUE(queue. push("hello"));
    ASSERT_TRUE(queue. push("world"));
    ASSERT_EQ(queue. size(), 2);

    std::string value;
    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, "hello");
    ASSERT_EQ(queue. size(), 1);

    ASSERT_TRUE(queue. push("foo"));
    ASSERT_EQ(queue. size(), 2);

    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, "world");
    ASSERT_EQ(queue. size(), 1);

    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, "foo");
    ASSERT_EQ(queue. size(), 0);
}

TEST(CircularQueueTest, CopyPushAndPop) {<!-- -->
    CircularQueue<std::string> queue(3);

    ASSERT_TRUE(queue. push(std::string("hello")));
    ASSERT_TRUE(queue. push(std::string("world")));
    ASSERT_EQ(queue. size(), 2);

    std::string value;
    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, "hello");
    ASSERT_EQ(queue. size(), 1);

    ASSERT_TRUE(queue. push(std::string("foo")));
    ASSERT_EQ(queue. size(), 2);

    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, "world");
    ASSERT_EQ(queue. size(), 1);

    ASSERT_TRUE(queue. pop(value));
    ASSERT_EQ(value, "foo");
    ASSERT_EQ(queue. size(), 0);
}

TEST(CircularQueueTest, MultiThreadPushPop) {<!-- -->
    const int num_threads = 4;
    const int num_iterations = 10000;
    const int queue_size = 100;

    CircularQueue<int> queue(queue_size);

    std::vector<std::thread> threads;

    for (int i = 0; i < num_threads; + + i) {<!-- -->
        threads.emplace_back([ & amp;queue, num_iterations]() {<!-- -->
            for (int j = 0; j < num_iterations; + + j) {<!-- -->
                queue. push(j);
            }
        });
    }

    for (int i = 0; i < num_threads; + + i) {<!-- -->
        threads.emplace_back([ & amp;queue, num_iterations]() {<!-- -->
            for (int j = 0; j < num_iterations; + + j) {<!-- -->
                int value;
                queue. pop(value);
            }
        });
    }

    for (auto & amp; thread : threads) {<!-- -->
        thread. join();
    }

    ASSERT_EQ(queue. size(), 0);
}

int main(int argc, char** argv) {<!-- -->
    testing::InitGoogleTest( &argc, argv);
    return RUN_ALL_TESTS();
}

Unit test run results:

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-CqCh8DqE-1679157650605)(C:\Users\Xiao Jingjing\AppData\Roaming\Typora\typora-user-images\image-20230319001514719.png)]

Unlocked version

The circular queue above uses locks to ensure thread safety.

The following is a thread-safe and lock-free circular queue implementation based on C++11, which supports blocking and non-blocking reads, and writes as well.

#include <atomic>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <thread>
#include <vector>

template <typename T, size_t N>
class RingQueue {<!-- -->
public:
    RingQueue() : read_idx_(0), write_idx_(0), data_{<!-- -->} {<!-- -->}

    bool Push(const T & amp; item, bool block = false) {<!-- --> return PushImpl(item, block); }
    bool Push(T & amp; & amp; item, bool block = false) {<!-- --> return PushImpl(std::move(item), block); }

    bool Pop(T & amp; item, bool block = false) {<!-- -->
        // There is only one reader, the reading of read_idx_ can be unlocked
        size_t current_read_idx = read_idx_.load(std::memory_order_relaxed);
   
// Guaranteed to read the change of write_idx_, where memory_order_acquire plays a role in visibility
        while (current_read_idx == write_idx_.load(std::memory_order_acquire)) {<!-- -->
        if (!block) {<!-- -->
        return false;
      }
      std::this_thread::yield();
        }

        item = std::move(data_[current_read_idx]); // data must be read before read_idx_ can be + 1, memory_order_release ensures that writing to item will not be rearranged after read_idx_ + 1
        read_idx_.store(Next(current_read_idx), std::memory_order_release);

        return true;
    }

    template <typename Func>
    bool Pop(Func & amp; & amp; func, bool block = false) {<!-- -->
        size_t current_read_idx = read_idx_.load(std::memory_order_relaxed);

        while (current_read_idx == write_idx_.load(std::memory_order_acquire)) {<!-- -->
        if (!block) {<!-- -->
        return false;
      }
      std::this_thread::yield();
        }

        T item = std::move(data_[current_read_idx]);
        read_idx_.store(Next(current_read_idx), std::memory_order_release);

        func(std::move(item));

        return true;
    }

    void PopAsync(const T & amp; value, std::function<void(bool)> callback) {<!-- -->
       auto task = [this, value, callback]() {<!-- -->
       bool result = Pop(value, true);
         callback(result);
       };
       std::thread(std::move(task)).detach();
    }
    
    bool IsEmpty() const {<!-- -->
        return read_idx_.load(std::memory_order_acquire) ==
               write_idx_.load(std::memory_order_acquire);
    }

    bool IsFull() const {<!-- -->
        return Next(write_idx_.load(std::memory_order_acquire)) ==
               read_idx_.load(std::memory_order_acquire);
    }

private:
    template <typename Item>
    bool PushImpl(Item & amp; & amp; item, bool block = false) {<!-- -->
        // There is only one write thread, so write_idx_ can be unlocked
        size_t current_write_idx = write_idx_.load(std::memory_order_relaxed);
        size_t next_write_idx = Next(current_write_idx);

        // The reading thread will modify read_idx_, so you need to ensure that you can see the change of read_idx_ here, and memory_order_acquire here guarantees the visibility problem
        while (next_write_idx == read_idx_.load(std::memory_order_acquire)) {<!-- -->
            if(!block) {<!-- -->
                return false;
            }
            std::this_thread::yield();
        }

        // Data must be written before write_idx_ + 1
        data_[current_write_idx] = std::forward<Item>(item);
        
        // Ensure that the previous write operation is visible to the reading thread, that is, the reading thread can immediately see the data just written by data_, of course, including the + 1 change of write_idx_, memory_order_release will ensure that the writing of data_ is at write_idx_ + 1 completed before the operation.
        // Because even if the data_ assignment statement is placed before write_idx_ + 1, due to compiler or runtime instruction rearrangement, it does not necessarily guarantee that the data_ assignment statement must be executed before write_idx_ + 1.
        write_idx_.store(next_write_idx, std::memory_order_release);

        return true;
    }

    size_t Next(size_t current_idx) const {<!-- --> return (current_idx + 1) % (N + 1); } // Here the author made a modification, N changed to N + 1

    std::atomic<size_t> read_idx_;
    std::atomic<size_t> write_idx_;
    std::array<T, N + 1> data_; // Here the author made a modification, N changed to N + 1
};

The template parameters T and N are used in the code, which supports the selection of different data types and different queue sizes, and realizes a lock-free ring queue with the help of two atomic variables of read and write pointers.

But it should be noted that this can only achieve one-read-one-write thread safety. When there are multiple readers or multiple writers, the thread is not safe.

The difficulty of lock-free programming lies in the understanding of several memory timings.

Added an explanation about memory timing operations.

C++ defines several memory timings, which specify how all memory operations (including ordinary variables and atomic variables) before and after atomic variables are ordered

std::memory_order_relaxedOnly ensure the atomicity of positive operations, there is no memory order restriction between multiple operations of the same atomic variable, that is, they can It can be reordered at will, and can also be disturbed by the operations of other threads at any time. Therefore, special care needs to be taken when using std::memory_order_relaxed, and the correctness of the operation must not be affected by this loose memory access order.

std::memory_order_relaxed is mainly used in occasions that do not require any synchronization mechanism, such as counter self-increment, self-decrement and other operations. These operations only need to ensure the correctness of the result, but do not need to guarantee its execution Order. Therefore, std::memory_order_relaxed is the fastest memory order, but also the most dangerous one.

std::memory_order_acquire ensures that all previous read operations have completed before performing the current read operation. This means that if the current read operation needs the results of previous read operations, it will be able to get those results correctly.

Specifically, when using the memory_order_acquire semantics, both the compiler and the processor will ensure that the CPU core (or processor) where the current thread is located will first cache the data obtained by all previous read operations from the CPU cache before executing the current atomic operation. Refresh to the main memory to ensure that the current thread can read the latest modification of shared variables by other threads.

Using memory_order_acquire semantics can ensure the correctness of the program and avoid the problem of data competition. However, using memory_order_acquire semantics may degrade program performance because it requires that all previous read operations must be flushed to main memory before performing atomic operations, which may result in increased overhead for cache coherency protocols. Therefore, in actual programming, the appropriate memory order semantics should be selected according to the specific situation.

std::memory_order_release ensures that all write operations by the current thread have completed before this atomic operation, and make these write operations visible to other threads. This way other threads can see the changes made by the current thread to the shared data.

This release operation is usually used for synchronous operations, such as notifying other threads after the value of a shared variable is updated. In this case, std::memory_order_release ensures that other threads see the updated value.

Push provides two versions of inserting lvalue and rvalue, improving the so-called performance of C ++ a little bit, PushImpl extracts the common code, realizes code reuse, elegant! .

Pop provides two versions. There is a variant that does not return the popped object, but calls the callback passed in from the outside to operate on the popped object.

Attach unit tests as well.

#include <gtest/gtest.h>
#include "RingQueue.h"

class RingQueueSingleThreadTest : public ::testing::Test {<!-- -->
protected:
    RingQueue<int, 10> queue_;
};

TEST_F(RingQueueSingleThreadTest, PushAndPop) {<!-- -->
    int value = 0;
    EXPECT_FALSE(queue_.Pop(value));
    EXPECT_TRUE(queue_.Push(1));
    EXPECT_FALSE(queue_.IsEmpty());
    EXPECT_TRUE(queue_.Pop(value));
    EXPECT_EQ(value, 1);
    EXPECT_TRUE(queue_.IsEmpty());
}

TEST_F(RingQueueSingleThreadTest, PushAndPopWithBlock) {<!-- -->
    int value = 0;
    std::thread t([ & amp;](){<!-- -->
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        queue_.Push(1, true);
    });
    EXPECT_TRUE(queue_.Pop(value, true));
    EXPECT_EQ(value, 1);
    t. join();
}

TEST_F(RingQueueSingleThreadTest, PushAndPopWithFunc) {<!-- -->
    int value = 0;
    queue_.Push(1);
    queue_.Pop([ & amp;](int v){<!-- --> value = v + 1; });
    EXPECT_EQ(value, 2);
}

TEST_F(RingQueueSingleThreadTest, IsEmptyAndIsFull) {<!-- -->
    EXPECT_TRUE(queue_.IsEmpty());
    EXPECT_FALSE(queue_.IsFull());
    for (int i = 0; i < 10; + + i) {<!-- -->
        EXPECT_TRUE(queue_.Push(i));
    }
    EXPECT_TRUE(queue_.IsFull());
    EXPECT_FALSE(queue_.IsEmpty());
    int value = 0;
    EXPECT_FALSE(queue_.Push(10));
    EXPECT_TRUE(queue_.Pop(value));
    EXPECT_FALSE(queue_.IsFull());
}

class RingQueueMultiThreadTest : public testing::Test {<!-- -->
protected:
    virtual void SetUp() {<!-- -->
        // Initialization data
        for (int i = 0; i < 1000; + + i) {<!-- -->
            data_.push_back("data_" + std::to_string(i));
        }
    }

    std::vector<std::string> data_;
};

TEST_F(RingQueueMultiThreadTest, MultiThreadTest) {<!-- -->
    RingQueue<std::string, 10> queue;

    // write thread
    std::thread writer([ & amp;queue, this]() {<!-- -->
        for (const auto & amp; item : data_) {<!-- -->
            queue. Push(item, true);
        }
    });

    // read thread
    std::thread reader([ & amp;queue, this]() {<!-- -->
        int count = 0;
        std::string item;
        while (count < 1000) {<!-- -->
            if (queue.Pop(item, true)) {<!-- -->
                EXPECT_EQ(item, "data_" + std::to_string(count));
                 + + count;
            } else {<!-- -->
                std::this_thread::yield();
            }
        }
    });

    writer. join();
    reader. join();
}

int main(int argc, char** argv) {<!-- -->
    ::testing::InitGoogleTest( &argc, argv);
    return RUN_ALL_TESTS();
}