TransientStorePool off-heap buffer pool technology

In what scenarios should TransientStorePool be turned on (technology emerges to solve problems)

In RocketMQ, TransientStorePool is a mechanism to optimize disk I/O performance. It improves write performance by pre-allocating memory blocks, writing messages to pre-allocated memory blocks (direct memory), and then using memory-mapped files (Memory-Mapped File) to flush data in memory blocks to disk. Therefore, when you need to improve the message writing performance of RocketMQ, you can consider enabling TransientStorePool.

The following are some specific scenarios for enabling TransientStorePool:

  1. High-performance scenarios: For scenarios with higher message throughput requirements, enabling TransientStorePool can reduce disk I/O and improve write performance.

  2. A large number of write operations: In the case of a large number of write operations, using TransientStorePool can reduce frequent system memory allocation and recovery operations, thereby improving performance.

  3. Cluster environment: In a cluster environment, enabling TransientStorePool can help improve the performance and stability of the entire cluster, especially when there are many cluster nodes.

  4. Peak period: During system peak periods (such as Double Eleven, Black Friday, etc.), message traffic and write operations may increase sharply. Turning on TransientStorePool can help cope with this temporary high load pressure and improve the processing capacity of the entire system.

  5. Asynchronous flushing: In the scenario of asynchronous flushing, writing messages to the memory buffer first can improve the response speed. Enabling TransientStorePool allows messages to be written to direct memory first, and then asynchronously written to disk through the background disk flushing service, thereby reducing latency.

  6. Low latency requirements: For scenarios with strict latency requirements, enabling TransientStorePool can reduce disk I/O operations and reduce message writing latency.

Since MMP can solve the write performance problem, why does TransientStorePool appear

1. Reduce GC pressure:

Using MMP (memory-mapped file) alone can really reduce GC pressure, because MMP uses off-heap memory. When the file is mapped to the memory address space, the read and write operations of the file are actually performed in the memory. This part of the memory is located outside the heap and is not affected by the Java garbage collection (GC) mechanism.

However, in practice, relying solely on MMP may not be able to completely avoid GC pressure. This is because in some scenarios, such as high concurrent writing, temporary objects still need to be created in heap memory, and these objects will be affected by GC.

When we combine TransientStorePool with MMP, we can further reduce GC pressure. RocketMQ writes messages to DirectByteBuffer (direct memory buffer) in TransientStorePool. DirectByteBuffer is a kind of off-heap memory and is not affected by GC. In this way, the write operation actually occurs in the off-heap memory, which not only reduces the allocation and recovery of heap memory, but also reduces the pressure of GC.

2. Asynchronous disk brushing: Using the combination of TransientStorePool and MMP, asynchronous disk brushing can be realized. First, the messages are written to memory blocks in the TransientStorePool, and then these memory blocks are mapped to the MMP. Finally, the data in the memory is asynchronously written to the disk through the background brushing service. This reduces disk I/O latency and improves responsiveness.

Design principle of TransientStorePool technology

The TransientStorePool in RocketMQ is designed to improve the performance of writing messages to disk. Its principle is mainly based on two technologies: memory pool and memory mapped file (Memory Mapped File, referred to as MMAP).

  1. Memory pool: TransientStorePool is a pre-allocated fixed-size memory pool. It is used to temporarily store message data to reduce frequent system memory allocation and recovery, thereby improving performance. When a producer sends a message, the message is first written to this mempool instead of directly to disk. Each memory block in the memory pool has the same size as a mapped memory block of the CommitLog file in RocketMQ.

  2. Memory-mapped files (MMAP): MMAP is a technique for mapping a file or part of a file into the memory address space of a process. RocketMQ uses MMAP technology to map CommitLog files into memory, so that CommitLog files can be read and written by directly operating memory, avoiding the performance overhead of disk IO operations.

Combining these two technologies, RocketMQ’s TransientStorePool design implements the following workflow:

  1. When a producer sends a message, the message is first written to a memory block in the memory pool.
  2. When the memory block is full, the data of the entire memory block is flushed into the mapped memory block corresponding to the CommitLog file through MMAP.
  3. Finally, by calling the msync method of the operating system, the data in the mapped memory is flushed to the disk.

This design can effectively reduce disk IO operations and improve RocketMQ’s message writing performance. At the same time, due to the use of memory pool and MMAP, RocketMQ can make full use of the caching mechanism of the operating system to further optimize performance.

Code Analysis of TransientStorePool

RocketMQ’s TransientStorePool design mainly uses the memory pool and MMAP technology to write messages into the memory blocks in the memory pool first, then flush the data of the memory blocks into the MMAP memory, and finally flush the data of the MMAP memory to the disk. This design improves the message writing performance of RocketMQ. RocketMQ’s TransientStorePool implementation is mainly located in two files: MappedFileQueue.java and MappedFile.java.

  1. MappedFileQueue.java

MappedFileQueue manages the collection of MappedFile, and each MappedFile corresponds to a CommitLog file. When using TransientStorePool, you need to set the useTransientStorePool property of MappedFileQueue to true.

private final boolean useTransientStorePool;

public MappedFileQueue(final String storePath, final int mappedFileSize, final TransientStorePool transientStorePool) {
    this. storePath = storePath;
    this.mappedFileSize = mappedFileSize;
    this. transientStorePool = transientStorePool;
    this.useTransientStorePool = transientStorePool != null;
}

2. MappedFile.java

The MappedFile class represents a file mapped to memory, and its main function is to read and write CommitLog files through memory-mapped files (MMAP).

When TransientStorePool is enabled, MappedFile will first write the message to the memory block in the memory pool. To realize this function, the MappedFile class defines a ByteBuffer attribute writeBuffer, which is used to point to the memory block in the memory pool.

// If TransientStorePool enabled, first write the data to the buffer, then write the data to the file
protected ByteBuffer writeBuffer = null;

In the constructor, if TransientStorePool is enabled, a memory block will be allocated from the memory pool, and writeBuffer will point to this memory block.

public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) {
    init(fileName, fileSize);
    if (transientStorePool != null) {
        this.writeBuffer = transientStorePool.borrowBuffer(fileSize);
    }
}

The appendMessage method of the MappedFile class is responsible for writing the message into memory. If TransientStorePool is enabled, the message will be written into the memory block pointed to by writeBuffer; otherwise, the message will be directly written into the MMAP memory.

public boolean appendMessage(final byte[] data) {
    int currentPos = this. wrotePosition. get();

    // Ensure remaining space is sufficient
    if ((this. fileSize - currentPos) >= data. length) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer. position(currentPos);
        byteBuffer. put(data);
        this. wrotePosition. addAndGet(data. length);
        return true;
    }

    return false;
}

Finally, in the flush method of the MappedFile class, the writeBuffer data will be flushed into the MMAP memory, and msyncMethod to flush the data of MMAP memory to disk.

public int flush(final int flushLeastPages) {
    if (this.writeBuffer == null || !this.hold()) {
        return 0;
    }

    int value = getReadPosition();

    // Ensure there is data to be flushed
    if (value > 0) {
        ByteBuffer byteBuffer = writeBuffer. slice();
        byteBuffer. position(0);
        byteBuffer. limit(value);
        this.mappedByteBuffer.position(0);
        this.mappedByteBuffer.put(byteBuffer);
        this.mappedByteBuffer.force();
    }

    this.committedPosition.set(value);
    this. release();
    return value;
}

this.mappedByteBuffer.force(); detailed code implementation

this.mappedByteBuffer.force() is a method of the MappedByteBuffer class in Java NIO. What it does is flush the modified portion of the memory-mapped file to disk. Let’s take a deeper look at the implementation of the force() method.

First of all, MappedByteBuffer is a class under the java.nio package, which inherits from the ByteBuffer class. The force() method of MappedByteBuffer is actually a native method. In the MappedByteBuffer class, you can see the following declaration:

public abstract class MappedByteBuffer extends ByteBuffer {
    //...
    public final native MappedByteBuffer force();
    //...
}

The force() method is a native method, which means its implementation is platform-dependent and written in C/C++. In the OpenJDK HotSpot virtual machine, the implementation of the force() method can be found in the src/hotspot/share/prims/Unsafe.cpp file in the source code.

It is implemented as follows:

UNSAFE_ENTRY(void, Unsafe_Force(JNIEnv *env, jobject unsafe, jobject obj)) {
  UnsafeWrapper("Unsafe_Force");
  if (UseDirectByteBuffer) {
    oop buf = JNIHandles::resolve(obj);
    jlong address = java_nio_Buffer::address_value(buf);
    size_t capacity = (size_t)java_nio_Buffer::capacity(buf);
    if (FlushViewOfFile((LPVOID)address, capacity) == 0) {
      DWORD err = GetLastError();
      const char* msg = os::strdup(env->GetStringUTFChars(JNU_NewStringPlatform(env, "FlushViewOfFile"), NULL));
      THROW_MSG_0(vmSymbols::java_io_IOException(), msg);
    }
  }
  return;
}
UNSAFE_END

The main logic of the Unsafe_Force function is to call the FlushViewOfFile function in the Windows API to flush the data of the memory buffer of the mapped file to the disk. It should be noted here that FlushViewOfFile is an API under the Windows platform. Other platforms (such as Linux) may use different APIs, but the functions implemented are the same.

In short, the implementation of this.mappedByteBuffer.force(); in Java NIO is actually a local method, which calls the API of the corresponding platform to flush the modified part of the memory-mapped file to disk. This ensures data persistence and consistency.

Analyze this.release();

The this.release() method is a method in the MappedFile class that decrements the reference count of the MappedFile object. In RocketMQ, the MappedFile object uses reference counting to manage its resources. If the reference count becomes 0, it means that the MappedFile object is not used by other threads and can be safely recycled.

In the MappedFile class, an atomic integer variable available is defined as a reference count. The initial value of the available variable is 1, which means that there is currently 1 reference using this MappedFile object.

// The number of available
private final AtomicInteger available = new AtomicInteger(1);

Next, let’s look at the implementation of the release() method:

public boolean release() {
    int value = this.available.decrementAndGet();
    if (value == 0) {
        // No other threads are using this MappedFile, it can be safely closed and resources can be released
        this.firstShutdownTimestamp = System.currentTimeMillis();
        if (this. appendMessageCallback != null) {
            this.appendMessageCallback.shutdown();
        }

        // Wait a short time to see if the file is still being used by other threads
        if ((((System. currentTimeMillis() - this. firstShutdownTimestamp)) >= this. fileReservedTime)
            || (0 == this. available. get())) {
            if (this. mappedByteBuffer != null) {
                clean(this. mappedByteBuffer);
                this.mappedByteBuffer = null;
            }

            if (this. fileChannel != null) {
                try {
                    this.fileChannel.close();
                } catch (IOException e) {
                    log.error("close file channel error.", e);
                }
                this. fileChannel = null;
            }

            log.info("this file[REF:{}] {} {}", this.available.get(), this.fileName, (this.firstShutdownTimestamp + this.fileReservedTime - System.currentTimeMillis()));
            return true;
        }
    } else if (value < 0) {
        this.available.set(0);
    }
    return false;
}

The main logic of release() is as follows:

  1. Decrease the value of available by 1. If the reduced value is 0, it means that no other thread is using this MappedFile object.
  2. If the condition is met (no other threads are using the object), record the current timestamp as firstShutdownTimestamp.
  3. If the file retention time has expired or the reference count is 0, then the following resource release operations are performed:
    • Call clean(this.mappedByteBuffer); to clean up the MappedByteBuffer object and set it to null.
    • Close the FileChannel, setting it to null.
    • Output the log, indicating that the file resource has been released.
  4. If the decremented reference count is less than 0, set the value of available to 0.

In short, the this.release(); method is used to reduce the reference count of the MappedFile object. When the reference count is 0, it means that the object is not used by other threads and is safe resource recovery. This ensures efficient use of resources and avoids resource leaks.

When will MappedFile return the memory block to the memory pool

The clean() method in the MappedFile class is only responsible for cleaning and releasing the mapped memory block, rather than returning the memory to the memory pool. The logic of returning memory to the memory pool should be implemented in the shutdown() method of the MappedFile class.

public void shutdown(final long intervalForcibly) {
    this. shutdown. set(true);
    if (this. hold()) {
        this. release();
    } else {
        int value = this.available.get();
        if (value > 0) {
            // wait some time to try to shutdown again
            if (this. shutdown. get() & amp; & amp; (System. currentTimeMillis() - this. firstShutdownTimestamp) > intervalForcibly) {
                this. release();
            }
        }
        if (this.writeBuffer != null & amp; & amp; this.isTransientStorePoolEnable) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this. writeBuffer = null;
        }
    }
}

The logic of the shutdown() method is as follows:

  1. Set the shutdown flag to true.
  2. If the current MappedFile object is held (the reference count is greater than 0), call the release() method to release resources such as mapped memory blocks and file channels.
  3. If the current MappedFile object is not held (the reference count is equal to 0), then directly return the memory block borrowed from the memory pool: call the transientStorePool.returnBuffer(writeBuffer) method, Return writeBuffer to the mempool and set writeBuffer to null.

this.transientStorePool.returnBuffer(writeBuffer); code implementation

this.transientStorePool.returnBuffer(writeBuffer) returns the memory block previously borrowed from TransientStorePool. This operation is done during the resource recovery process of MappedFile. The specific return operation is implemented in the TransientStorePool class.

Here is the returnBuffer() method in the TransientStorePool class:

public void returnBuffer(ByteBuffer byteBuffer) {
    this.pool.returnBuffer(byteBuffer);
}

As can be seen from the code, the TransientStorePool.returnBuffer() method actually calls the returnBuffer() method of the internal ByteBufferPool object to complete the return operate.

Next, let’s take a look at the returnBuffer() method in the ByteBufferPool class:

public void returnBuffer(ByteBuffer buffer) {
  // Check if the buffer size is equal to chunkSize
  if (buffer. capacity() != this. chunkSize) {
    throw new IllegalArgumentException("The returned buffer capacity " + buffer. capacity() + " does not match the chunk size " + this. chunkSize);
  }

  // Clear the buffer and add it back to the queue
  buffer. clear();
  this.queue.offer(buffer);
}


/**
* The clear() method just resets the mark, position and limit of the buffer, but does not actually clear the data in the buffer. In fact, the data still exists, but may be overwritten in subsequent write operations.
*/
public final Buffer clear() {
    position = 0; // set position to 0
    limit = capacity; // set limit to capacity
    mark = -1; // set mark to -1
    return this;
}

transientStorePool.borrowBuffer(fileSize); code analysis

The function of the transientStorePool.borrowBuffer(fileSize) method is to borrow a memory block whose size is fileSize from the TransientStorePool object. Here is the code analysis:

In the org.apache.rocketmq.store.TransientStorePool class, the borrowBuffer method is defined as follows:

public ByteBuffer borrowBuffer(int size) {
    ByteBuffer buffer = this.pool.borrowBuffer(size);
    if (buffer. remaining() != size) {
        log.error("The allocated buffer size not equals with request size {} {}", size, buffer.remaining());
        this.pool.returnBuffer(buffer);
        throw new RuntimeException("The allocated buffer size not equals with request size");
    }
    buffer. position(0);
    buffer. limit(size);
    return buffer;
}

The logic of the borrowBuffer method is as follows:

  1. Borrows a ByteBuffer of size size from the internal ByteBufferPool object pool. ByteBufferPool is a queue for storing pre-allocated memory blocks.
  2. Check whether the remaining space of ByteBuffer obtained from pool is equal to size, if not equal to size, it indicates memory Block allocation error, return the memory block to the pool, and throw a runtime exception.
  3. If the obtained ByteBuffer is correct, set its position (position) to 0, limit (limit) to size, and then return this ByteBuffer object .

this.pool.borrowBuffer(size); code analysis

The this.pool.borrowBuffer(size) method is used to borrow a memory block of size size from the ByteBufferPool object. Here is the code analysis:

The ByteBufferPool class is a memory pool that pre-allocates blocks of memory and provides borrow operations when needed. It internally maintains a queue queue, which stores pre-allocated memory blocks. The borrowBuffer() method is used to get a memory block from the queue.

The following is the implementation of the ByteBufferPool.borrowBuffer(int) method:

public ByteBuffer borrowBuffer(int size) {
    // Validate the requested size
    if (size > this. chunkSize) {
        throw new IllegalArgumentException("Requested buffer size " + size + " cannot exceed the chunk size " + this. chunkSize);
    }

    //Try to get a buffer from the queue
    ByteBuffer buffer = this.queue.poll();
    if (buffer == null) {
        // If the queue is empty, create a new buffer
        buffer = ByteBuffer. allocateDirect(this. chunkSize);
        this.directMemoryCounter.addAndGet(this.chunkSize);
    }
    return buffer;
}

The logic of the borrowBuffer() method is as follows:

  1. Verify that the requested memory chunk size size exceeds the maximum single memory chunk capacity chunkSize of ByteBufferPool. If exceeded, an exception is thrown.
  2. Attempts to get a memory block (a ByteBuffer object) from the internal queue queue. queue stores pre-allocated memory blocks, and a memory block at the head of the queue can be obtained and removed through the poll() method.
  3. If the queue is empty (i.e. buffer == null), manually create a new direct memory block (using the ByteBuffer.allocateDirect() method). A newly created chunk of memory is chunkSize and added to the directMemoryCounter for counting.

What is the default chunkSize

In RocketMQ, the default value of chunkSize is a fixed integer value, which is 64KB (64 * 1024).

In the org.apache.rocketmq.store.TransientStorePool class, you can see that chunkSize is defined as a constant whose value is 64 * 1024; this value is in chunkSize parameter passed to ByteBufferPool in the constructor of >TransientStorePool; the size of chunkSize determines the ByteBufferPool The size of each pre-allocated memory block in code>. Setting an appropriate chunkSize value can avoid frequent system memory allocation and recovery operations, thereby improving performance. In RocketMQ, the default chunkSize is 64KB, which is a reasonable setting for most scenarios. Of course, according to the actual application requirements, you can modify this value as needed.

public static final int PoolChunkSize = 1024 * 64;

public TransientStorePool(final int poolSize, final int fileSize) {
    this. poolSize = poolSize;
    this. fileSize = fileSize;
    this.pool = new ByteBufferPool(poolSize, PoolChunkSize);
}