In what scenarios should TransientStorePool be turned on (technology emerges to solve problems)
In RocketMQ, TransientStorePool
is a mechanism to optimize disk I/O performance. It improves write performance by pre-allocating memory blocks, writing messages to pre-allocated memory blocks (direct memory), and then using memory-mapped files (Memory-Mapped File) to flush data in memory blocks to disk. Therefore, when you need to improve the message writing performance of RocketMQ, you can consider enabling TransientStorePool
.
The following are some specific scenarios for enabling TransientStorePool
:
-
High-performance scenarios: For scenarios with higher message throughput requirements, enabling
TransientStorePool
can reduce disk I/O and improve write performance. -
A large number of write operations: In the case of a large number of write operations, using
TransientStorePool
can reduce frequent system memory allocation and recovery operations, thereby improving performance. -
Cluster environment: In a cluster environment, enabling
TransientStorePool
can help improve the performance and stability of the entire cluster, especially when there are many cluster nodes. -
Peak period: During system peak periods (such as Double Eleven, Black Friday, etc.), message traffic and write operations may increase sharply. Turning on
TransientStorePool
can help cope with this temporary high load pressure and improve the processing capacity of the entire system. -
Asynchronous flushing: In the scenario of asynchronous flushing, writing messages to the memory buffer first can improve the response speed. Enabling
TransientStorePool
allows messages to be written to direct memory first, and then asynchronously written to disk through the background disk flushing service, thereby reducing latency. -
Low latency requirements: For scenarios with strict latency requirements, enabling
TransientStorePool
can reduce disk I/O operations and reduce message writing latency.
Since MMP can solve the write performance problem, why does TransientStorePool appear
1. Reduce GC pressure:
Using MMP (memory-mapped file) alone can really reduce GC pressure, because MMP uses off-heap memory. When the file is mapped to the memory address space, the read and write operations of the file are actually performed in the memory. This part of the memory is located outside the heap and is not affected by the Java garbage collection (GC) mechanism.
However, in practice, relying solely on MMP may not be able to completely avoid GC pressure. This is because in some scenarios, such as high concurrent writing, temporary objects still need to be created in heap memory, and these objects will be affected by GC.
When we combine TransientStorePool with MMP, we can further reduce GC pressure. RocketMQ writes messages to DirectByteBuffer (direct memory buffer) in TransientStorePool. DirectByteBuffer is a kind of off-heap memory and is not affected by GC. In this way, the write operation actually occurs in the off-heap memory, which not only reduces the allocation and recovery of heap memory, but also reduces the pressure of GC.
2. Asynchronous disk brushing: Using the combination of TransientStorePool and MMP, asynchronous disk brushing can be realized. First, the messages are written to memory blocks in the TransientStorePool, and then these memory blocks are mapped to the MMP. Finally, the data in the memory is asynchronously written to the disk through the background brushing service. This reduces disk I/O latency and improves responsiveness.
Design principle of TransientStorePool technology
The TransientStorePool in RocketMQ is designed to improve the performance of writing messages to disk. Its principle is mainly based on two technologies: memory pool and memory mapped file (Memory Mapped File, referred to as MMAP).
-
Memory pool: TransientStorePool is a pre-allocated fixed-size memory pool. It is used to temporarily store message data to reduce frequent system memory allocation and recovery, thereby improving performance. When a producer sends a message, the message is first written to this mempool instead of directly to disk. Each memory block in the memory pool has the same size as a mapped memory block of the CommitLog file in RocketMQ.
-
Memory-mapped files (MMAP): MMAP is a technique for mapping a file or part of a file into the memory address space of a process. RocketMQ uses MMAP technology to map CommitLog files into memory, so that CommitLog files can be read and written by directly operating memory, avoiding the performance overhead of disk IO operations.
Combining these two technologies, RocketMQ’s TransientStorePool design implements the following workflow:
- When a producer sends a message, the message is first written to a memory block in the memory pool.
- When the memory block is full, the data of the entire memory block is flushed into the mapped memory block corresponding to the CommitLog file through MMAP.
- Finally, by calling the
msync
method of the operating system, the data in the mapped memory is flushed to the disk.
This design can effectively reduce disk IO operations and improve RocketMQ’s message writing performance. At the same time, due to the use of memory pool and MMAP, RocketMQ can make full use of the caching mechanism of the operating system to further optimize performance.
Code Analysis of TransientStorePool
RocketMQ’s TransientStorePool design mainly uses the memory pool and MMAP technology to write messages into the memory blocks in the memory pool first, then flush the data of the memory blocks into the MMAP memory, and finally flush the data of the MMAP memory to the disk. This design improves the message writing performance of RocketMQ. RocketMQ’s TransientStorePool implementation is mainly located in two files: MappedFileQueue.java
and MappedFile.java
.
- MappedFileQueue.java
MappedFileQueue
manages the collection of MappedFile
, and each MappedFile
corresponds to a CommitLog file. When using TransientStorePool, you need to set the useTransientStorePool
property of MappedFileQueue
to true
.
private final boolean useTransientStorePool; public MappedFileQueue(final String storePath, final int mappedFileSize, final TransientStorePool transientStorePool) { this. storePath = storePath; this.mappedFileSize = mappedFileSize; this. transientStorePool = transientStorePool; this.useTransientStorePool = transientStorePool != null; }
2. MappedFile.java
The MappedFile
class represents a file mapped to memory, and its main function is to read and write CommitLog files through memory-mapped files (MMAP).
When TransientStorePool is enabled, MappedFile
will first write the message to the memory block in the memory pool. To realize this function, the MappedFile
class defines a ByteBuffer
attribute writeBuffer
, which is used to point to the memory block in the memory pool.
// If TransientStorePool enabled, first write the data to the buffer, then write the data to the file protected ByteBuffer writeBuffer = null;
In the constructor, if TransientStorePool is enabled, a memory block will be allocated from the memory pool, and writeBuffer
will point to this memory block.
public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) { init(fileName, fileSize); if (transientStorePool != null) { this.writeBuffer = transientStorePool.borrowBuffer(fileSize); } }
The appendMessage
method of the MappedFile
class is responsible for writing the message into memory. If TransientStorePool is enabled, the message will be written into the memory block pointed to by writeBuffer
; otherwise, the message will be directly written into the MMAP memory.
public boolean appendMessage(final byte[] data) { int currentPos = this. wrotePosition. get(); // Ensure remaining space is sufficient if ((this. fileSize - currentPos) >= data. length) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer. position(currentPos); byteBuffer. put(data); this. wrotePosition. addAndGet(data. length); return true; } return false; }
Finally, in the flush
method of the MappedFile
class, the writeBuffer
data will be flushed into the MMAP memory, and msync
Method to flush the data of MMAP memory to disk.
public int flush(final int flushLeastPages) { if (this.writeBuffer == null || !this.hold()) { return 0; } int value = getReadPosition(); // Ensure there is data to be flushed if (value > 0) { ByteBuffer byteBuffer = writeBuffer. slice(); byteBuffer. position(0); byteBuffer. limit(value); this.mappedByteBuffer.position(0); this.mappedByteBuffer.put(byteBuffer); this.mappedByteBuffer.force(); } this.committedPosition.set(value); this. release(); return value; }
this.mappedByteBuffer.force(); detailed code implementation
this.mappedByteBuffer.force()
is a method of the MappedByteBuffer
class in Java NIO. What it does is flush the modified portion of the memory-mapped file to disk. Let’s take a deeper look at the implementation of the force()
method.
First of all, MappedByteBuffer
is a class under the java.nio
package, which inherits from the ByteBuffer
class. The force()
method of MappedByteBuffer
is actually a native method. In the MappedByteBuffer
class, you can see the following declaration:
public abstract class MappedByteBuffer extends ByteBuffer { //... public final native MappedByteBuffer force(); //... }
The force()
method is a native method, which means its implementation is platform-dependent and written in C/C++. In the OpenJDK HotSpot virtual machine, the implementation of the force()
method can be found in the src/hotspot/share/prims/Unsafe.cpp
file in the source code.
It is implemented as follows:
UNSAFE_ENTRY(void, Unsafe_Force(JNIEnv *env, jobject unsafe, jobject obj)) { UnsafeWrapper("Unsafe_Force"); if (UseDirectByteBuffer) { oop buf = JNIHandles::resolve(obj); jlong address = java_nio_Buffer::address_value(buf); size_t capacity = (size_t)java_nio_Buffer::capacity(buf); if (FlushViewOfFile((LPVOID)address, capacity) == 0) { DWORD err = GetLastError(); const char* msg = os::strdup(env->GetStringUTFChars(JNU_NewStringPlatform(env, "FlushViewOfFile"), NULL)); THROW_MSG_0(vmSymbols::java_io_IOException(), msg); } } return; } UNSAFE_END
The main logic of the Unsafe_Force
function is to call the FlushViewOfFile
function in the Windows API to flush the data of the memory buffer of the mapped file to the disk. It should be noted here that FlushViewOfFile
is an API under the Windows platform. Other platforms (such as Linux) may use different APIs, but the functions implemented are the same.
In short, the implementation of this.mappedByteBuffer.force();
in Java NIO is actually a local method, which calls the API of the corresponding platform to flush the modified part of the memory-mapped file to disk. This ensures data persistence and consistency.
Analyze this.release();
The this.release()
method is a method in the MappedFile
class that decrements the reference count of the MappedFile
object. In RocketMQ, the MappedFile
object uses reference counting to manage its resources. If the reference count becomes 0, it means that the MappedFile
object is not used by other threads and can be safely recycled.
In the MappedFile
class, an atomic integer variable available
is defined as a reference count. The initial value of the available
variable is 1, which means that there is currently 1 reference using this MappedFile
object.
// The number of available private final AtomicInteger available = new AtomicInteger(1);
Next, let’s look at the implementation of the release()
method:
public boolean release() { int value = this.available.decrementAndGet(); if (value == 0) { // No other threads are using this MappedFile, it can be safely closed and resources can be released this.firstShutdownTimestamp = System.currentTimeMillis(); if (this. appendMessageCallback != null) { this.appendMessageCallback.shutdown(); } // Wait a short time to see if the file is still being used by other threads if ((((System. currentTimeMillis() - this. firstShutdownTimestamp)) >= this. fileReservedTime) || (0 == this. available. get())) { if (this. mappedByteBuffer != null) { clean(this. mappedByteBuffer); this.mappedByteBuffer = null; } if (this. fileChannel != null) { try { this.fileChannel.close(); } catch (IOException e) { log.error("close file channel error.", e); } this. fileChannel = null; } log.info("this file[REF:{}] {} {}", this.available.get(), this.fileName, (this.firstShutdownTimestamp + this.fileReservedTime - System.currentTimeMillis())); return true; } } else if (value < 0) { this.available.set(0); } return false; }
The main logic of release()
is as follows:
- Decrease the value of
available
by 1. If the reduced value is 0, it means that no other thread is using thisMappedFile
object. - If the condition is met (no other threads are using the object), record the current timestamp as
firstShutdownTimestamp
. - If the file retention time has expired or the reference count is 0, then the following resource release operations are performed:
- Call clean(this.mappedByteBuffer); to clean up the
MappedByteBuffer
object and set it tonull
. - Close the
FileChannel
, setting it tonull
. - Output the log, indicating that the file resource has been released.
- Call clean(this.mappedByteBuffer); to clean up the
- If the decremented reference count is less than 0, set the value of
available
to 0.
In short, the this.release();
method is used to reduce the reference count of the MappedFile
object. When the reference count is 0, it means that the object is not used by other threads and is safe resource recovery. This ensures efficient use of resources and avoids resource leaks.
When will MappedFile return the memory block to the memory pool
The clean()
method in the MappedFile
class is only responsible for cleaning and releasing the mapped memory block, rather than returning the memory to the memory pool. The logic of returning memory to the memory pool should be implemented in the shutdown()
method of the MappedFile
class.
public void shutdown(final long intervalForcibly) { this. shutdown. set(true); if (this. hold()) { this. release(); } else { int value = this.available.get(); if (value > 0) { // wait some time to try to shutdown again if (this. shutdown. get() & amp; & amp; (System. currentTimeMillis() - this. firstShutdownTimestamp) > intervalForcibly) { this. release(); } } if (this.writeBuffer != null & amp; & amp; this.isTransientStorePoolEnable) { this.transientStorePool.returnBuffer(writeBuffer); this. writeBuffer = null; } } }
The logic of the shutdown()
method is as follows:
- Set the
shutdown
flag totrue
. - If the current
MappedFile
object is held (the reference count is greater than 0), call therelease()
method to release resources such as mapped memory blocks and file channels. - If the current
MappedFile
object is not held (the reference count is equal to 0), then directly return the memory block borrowed from the memory pool: call thetransientStorePool.returnBuffer(writeBuffer)
method, ReturnwriteBuffer
to the mempool and setwriteBuffer
tonull
.
this.transientStorePool.returnBuffer(writeBuffer); code implementation
this.transientStorePool.returnBuffer(writeBuffer)
returns the memory block previously borrowed from TransientStorePool
. This operation is done during the resource recovery process of MappedFile
. The specific return operation is implemented in the TransientStorePool
class.
Here is the returnBuffer()
method in the TransientStorePool
class:
public void returnBuffer(ByteBuffer byteBuffer) { this.pool.returnBuffer(byteBuffer); }
As can be seen from the code, the TransientStorePool.returnBuffer()
method actually calls the returnBuffer()
method of the internal ByteBufferPool
object to complete the return operate.
Next, let’s take a look at the returnBuffer()
method in the ByteBufferPool
class:
public void returnBuffer(ByteBuffer buffer) { // Check if the buffer size is equal to chunkSize if (buffer. capacity() != this. chunkSize) { throw new IllegalArgumentException("The returned buffer capacity " + buffer. capacity() + " does not match the chunk size " + this. chunkSize); } // Clear the buffer and add it back to the queue buffer. clear(); this.queue.offer(buffer); } /** * The clear() method just resets the mark, position and limit of the buffer, but does not actually clear the data in the buffer. In fact, the data still exists, but may be overwritten in subsequent write operations. */ public final Buffer clear() { position = 0; // set position to 0 limit = capacity; // set limit to capacity mark = -1; // set mark to -1 return this; }
transientStorePool.borrowBuffer(fileSize); code analysis
The function of the transientStorePool.borrowBuffer(fileSize)
method is to borrow a memory block whose size is fileSize
from the TransientStorePool
object. Here is the code analysis:
In the org.apache.rocketmq.store.TransientStorePool
class, the borrowBuffer
method is defined as follows:
public ByteBuffer borrowBuffer(int size) { ByteBuffer buffer = this.pool.borrowBuffer(size); if (buffer. remaining() != size) { log.error("The allocated buffer size not equals with request size {} {}", size, buffer.remaining()); this.pool.returnBuffer(buffer); throw new RuntimeException("The allocated buffer size not equals with request size"); } buffer. position(0); buffer. limit(size); return buffer; }
The logic of the borrowBuffer
method is as follows:
- Borrows a ByteBuffer of size
size
from the internalByteBufferPool
objectpool
.ByteBufferPool
is a queue for storing pre-allocated memory blocks. - Check whether the remaining space of
ByteBuffer
obtained frompool
is equal tosize
, if not equal tosize
, it indicates memory Block allocation error, return the memory block to thepool
, and throw a runtime exception. - If the obtained
ByteBuffer
is correct, set its position (position) to 0, limit (limit) tosize
, and then return thisByteBuffer
object .
this.pool.borrowBuffer(size); code analysis
The this.pool.borrowBuffer(size)
method is used to borrow a memory block of size size
from the ByteBufferPool
object. Here is the code analysis:
The ByteBufferPool
class is a memory pool that pre-allocates blocks of memory and provides borrow operations when needed. It internally maintains a queue queue
, which stores pre-allocated memory blocks. The borrowBuffer()
method is used to get a memory block from the queue.
The following is the implementation of the ByteBufferPool.borrowBuffer(int)
method:
public ByteBuffer borrowBuffer(int size) { // Validate the requested size if (size > this. chunkSize) { throw new IllegalArgumentException("Requested buffer size " + size + " cannot exceed the chunk size " + this. chunkSize); } //Try to get a buffer from the queue ByteBuffer buffer = this.queue.poll(); if (buffer == null) { // If the queue is empty, create a new buffer buffer = ByteBuffer. allocateDirect(this. chunkSize); this.directMemoryCounter.addAndGet(this.chunkSize); } return buffer; }
The logic of the borrowBuffer()
method is as follows:
- Verify that the requested memory chunk size
size
exceeds the maximum single memory chunk capacitychunkSize
ofByteBufferPool
. If exceeded, an exception is thrown. - Attempts to get a memory block (a
ByteBuffer
object) from the internal queuequeue
.queue
stores pre-allocated memory blocks, and a memory block at the head of the queue can be obtained and removed through thepoll()
method. - If the queue is empty (i.e.
buffer == null
), manually create a new direct memory block (using theByteBuffer.allocateDirect()
method). A newly created chunk of memory ischunkSize
and added to thedirectMemoryCounter
for counting.
What is the default chunkSize
In RocketMQ, the default value of chunkSize
is a fixed integer value, which is 64KB (64 * 1024).
In the org.apache.rocketmq.store.TransientStorePool
class, you can see that chunkSize
is defined as a constant whose value is 64 * 1024; this value is in chunkSize
parameter passed to ByteBufferPool
in the constructor of >TransientStorePool; the size of chunkSize
determines the ByteBufferPool
The size of each pre-allocated memory block in code>. Setting an appropriate chunkSize
value can avoid frequent system memory allocation and recovery operations, thereby improving performance. In RocketMQ, the default chunkSize
is 64KB, which is a reasonable setting for most scenarios. Of course, according to the actual application requirements, you can modify this value as needed.
public static final int PoolChunkSize = 1024 * 64; public TransientStorePool(final int poolSize, final int fileSize) { this. poolSize = poolSize; this. fileSize = fileSize; this.pool = new ByteBufferPool(poolSize, PoolChunkSize); }