Netty core source code analysis

Netty thread model

image.png

The essence of Netty’s high-concurrency and high-performance architecture design

  1. Master-slave Reactor thread model
  2. NIO multiplexing non-blocking
  3. Lock-free serialization design ideas
  4. Supports high-performance serialization protocols
  5. Zero copy (direct memory usage)
  6. ByteBuf memory pool design
  7. Flexible TCP parameter configuration capability
  8. Concurrency optimization

Lock-free serialization design idea

In most scenarios, parallel multi-thread processing can improve the concurrency performance of the system. However, if concurrent access to shared resources is not handled properly, serious lock contention will occur, which will eventually lead to performance degradation. In order to avoid the performance loss caused by lock competition as much as possible, you can use serialization design, that is, message processing is completed in the same thread as much as possible, and no thread switching is performed during the period, thus avoiding Multithread contention and synchronization locks. NIO multiplexing is a lock-free serialization design idea (understand the thread models of Redis and Netty). In order to improve performance as much as possible, Netty adopts a serial lock-free design. , perform serial operations within the IO thread to avoid performance degradation caused by multi-thread competition. On the surface, the serialization design seems to have low CPU utilization and insufficient concurrency. However, by adjusting the thread parameters of the NIO thread pool, multiple serialized threads can be started to run in parallel at the same time. This partially lock-free serial thread design has better performance than the “one queue-multiple worker threads” model. .
After Netty’s NioEventLoop reads the message, it directly calls ChannelPipeline’s fireChannelRead(Object msg). As long as the user does not actively switch threads, NioEventLoop will always call the user’s Handler, and no thread switching will be performed during this period. This serialization method avoids It eliminates lock competition caused by multi-threaded operations and is optimal from a performance perspective.

Direct memory

Direct memory (Direct Memory) is not part of the virtual machine runtime data area, nor is it a memory area defined in the Java virtual machine specification. In some cases, this part of memory will be frequently used, and may also cause an OutOfMemoryError exception. . In Java, you can use DirectByteBuffer to allocate a piece of direct memory (off-heap memory). The memory corresponding to the metaspace is also called direct memory, and they all correspond to the physical memory of the machine.
image.png
Test code:

package com.firechou.test;

import java.nio.ByteBuffer;

/**
 * The difference between direct memory and heap memory
 */
public class DirectMemoryTest {<!-- -->

    public static void heapAccess() {<!-- -->
        long startTime = System.currentTimeMillis();
        //Allocate heap memory
        ByteBuffer buffer = ByteBuffer.allocate(1000);
        for (int i = 0; i < 100000; i + + ) {<!-- -->
            for (int j = 0; j < 200; j + + ) {<!-- -->
                buffer.putInt(j);
            }
            buffer.flip();
            for (int j = 0; j < 200; j + + ) {<!-- -->
                buffer.getInt();
            }
            buffer.clear();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Heap memory access:" + (endTime - startTime) + "ms");
    }

    public static void directAccess() {<!-- -->
        long startTime = System.currentTimeMillis();
        //Allocate direct memory
        ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
        for (int i = 0; i < 100000; i + + ) {<!-- -->
            for (int j = 0; j < 200; j + + ) {<!-- -->
                buffer.putInt(j);
            }
            buffer.flip();
            for (int j = 0; j < 200; j + + ) {<!-- -->
                buffer.getInt();
            }
            buffer.clear();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Direct memory access:" + (endTime - startTime) + "ms");
    }

    public static void heapAllocate() {<!-- -->
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 100000; i + + ) {<!-- -->
            ByteBuffer.allocate(100);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Heap memory application:" + (endTime - startTime) + "ms");
    }

    public static void directAllocate() {<!-- -->
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 100000; i + + ) {<!-- -->
            ByteBuffer.allocateDirect(100);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Direct memory application:" + (endTime - startTime) + "ms");
    }

    public static void main(String args[]) {<!-- -->
        for (int i = 0; i < 10; i + + ) {<!-- -->
            heapAccess();
            directAccess();
        }

        System.out.println();

        for (int i = 0; i < 10; i + + ) {<!-- -->
            heapAllocate();
            directAllocate();
        }
    }
}

// operation result
Heap memory access: 70ms
Direct memory access: 44ms
Heap memory access: 40ms
Direct memory access: 28ms
Heap memory access: 67ms
Direct memory access: 55ms
Heap memory access: 57ms
Direct memory access: 41ms
Heap memory access: 59ms
Direct memory access: 36ms
Heap memory access: 48ms
Direct memory access: 32ms
Heap memory access: 44ms
Direct memory access: 26ms
Heap memory access: 38ms
Direct memory access: 25ms
Heap memory access: 44ms
Direct memory access: 29ms
Heap memory access: 45ms
Direct memory access: 26ms

Heap memory application: 14ms
Direct memory request: 51ms
Heap memory application: 13ms
Direct memory request: 40ms
Heap memory application: 75ms
Direct memory request: 55ms
Heap memory application: 2ms
Direct memory request: 26ms
Heap memory application: 2ms
Direct memory request: 83ms
Heap memory application: 1ms
Direct memory request: 34ms
Heap memory application: 5ms
Direct memory request: 34ms
Heap memory application: 2ms
Direct memory request: 38ms
Heap memory application: 7ms
Direct memory request: 37ms
Heap memory application: 7ms
Direct memory request: 35ms

It can be seen from the program running results thatdirect memory application is slow, but access efficiency is high. In the implementation of Java virtual machine, local IO generally directly operates direct memory (direct memory => system call => hard disk/network card), while non-direct memory requires a secondary copy (heap memory => direct memory => system call = >hard disk/network card).

Direct memory allocation source code analysis:

public static ByteBuffer allocateDirect(int capacity) {<!-- -->
    return new DirectByteBuffer(capacity);
}


DirectByteBuffer(int cap) {<!-- --> // package-private
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    //To determine whether there is enough direct memory space allocation, you can specify the maximum allocable direct memory space through the -XX:MaxDirectMemorySize=<size> parameter. If not specified, the default is the maximum heap memory size.
    //If there is insufficient space when allocating direct memory, System.gc() will be called to trigger a full gc to reclaim some useless direct memory reference objects, and the direct memory will also be released.
    //If the allocated space is still not enough after releasing, the exception java.lang.OutOfMemoryError will be thrown.
   Bits.reserveMemory(size, cap);

    long base = 0;
    try {<!-- -->
        // Call unsafe local method to allocate direct memory
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {<!-- -->
        // Allocation failed, release memory
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa & amp; & amp; (base % ps != 0)) {<!-- -->
        // Round up to page boundary
        address = base + ps - (base & amp; (ps - 1));
    } else {<!-- -->
        address = base;
    }
    
    // Use the Cleaner mechanism to register the memory recycling processing function. When the direct memory reference object is cleaned up by the GC,
    // The run method of the Deallocator thread object registered here to release direct memory will be called in advance.
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}


//Apply for a piece of local memory. The memory space is uninitialized and its contents are unpredictable.
// Use freeMemory to release memory and use reallocateMemory to modify the memory size.
public native long allocateMemory(long bytes);

// openjdk8/hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {<!-- -->
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {<!-- -->
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  // Call os::malloc to apply for memory. Internally, malloc, a function of the C standard library, is used to apply for memory.
  void* x = os::malloc(sz, mtInternal);
  if (x == NULL) {<!-- -->
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
      return addr_to_java(x);
UNSAFE_END

Advantages and disadvantages of using direct memory:
advantage:

  • Does not occupy heap memory space and reduces the possibility of GC.
  • In Java virtual machine implementation, local IO will directly operate direct memory (direct memory => system call => hard disk/network card), while non-direct memory requires a secondary copy (heap memory => direct memory => system call => hard disk). /network card)

shortcoming:

  • Initial allocation is slower
  • Without JVM directly helping to manage memory, memory overflow is prone to occur. In order to avoid the fact that there is no FULL GC all the time, direct memory will eventually run out of physical memory. We can specify the maximum value of direct memory through -XX:MaxDirectMemorySize. When the threshold is reached, call system.gc to perform a FULL GC and indirectly recycle the unused direct memory. Lose.

Netty zero copy

image.png
Netty uses DIRECT BUFFERS to receive and send ByteBuf, using off-heap direct memory for Socket reading and writing, without the need for a secondary copy of the byte buffer.
If you use traditional JVM heap memory (HEAP BUFFERS) for Socket reading and writing, the JVM will copy the heap memory Buffer to direct memory before writing it to the Socket. The data in the JVM heap memory cannot be written directly into the Socket. Compared with direct memory outside the heap, the message has an additional memory copy of the buffer during the sending process.
You can take a look at netty’s reading and writing source code, such as read source code NioByteUnsafe.read():
image.png
image.png
image.png

ByteBuf memory pool design

With the development of JVM virtual machine and JIT just-in-time compilation technology, object allocation and recycling is a very lightweight task. But for the buffer Buffer (equivalent to a memory block), the situation is slightly different, especially the allocation and recycling of direct memory outside the heap, which is a time-consuming operation. In order to reuse buffers as much as possible, Netty provides a buffer reuse mechanism based on ByteBuf memory pool. When needed, simply obtain ByteBuf from the pool and use it, and put it back into the pool after use. Let’s take a look at the implementation of Netty ByteBuf:
image.png
You can take a look at the ByteBuf memory pool used in netty’s read and write source code, such as the read source code NioByteUnsafe.read().

Flexible TCP parameter configuration capability

Properly setting TCP parameters can have a significant effect on improving performance in certain scenarios, such as the receive buffer SO_RCVBUF and the send buffer SO_SNDBUF. If set incorrectly, the impact on performance is very large. Usually the recommended value is 128K or 256K.
Netty can flexibly configure TCP parameters in the startup auxiliary class ChannelOption to meet different user scenarios.
image.png

Concurrency optimization

  • Massive and correct use of volatile;
  • Extensive use of CAS and atomic classes;
  • The use of thread-safe containers;
  • Improve concurrency performance through read-write locks.

ByteBuf expansion mechanism

If we need to understand the expansion of ByteBuf, we need to first understand several member variables defined in ByteBuf, and then analyze the expansion from the perspective of source code.
image.png

minNewCapacity: Indicates the value size that the user needs to write
threshold: Threshold, which is the maximum capacity set inside Bytebuf.
maxCapacity: The maximum capacity that Netty can accept, usually the maximum value of int

ByteBuf core expansion method:
Enter the ByteBuf source code and analyze its expansion method in depth. Enter the idea source code: ByteBuf.writeByte()->AbstractByteBuf->calculateNewCapacity.

// io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {<!-- -->
    checkPositiveOrZero(minNewCapacity, "minNewCapacity");
    if (minNewCapacity > maxCapacity) {<!-- -->
        // If it is larger than the maximum capacity, throw an exception
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {<!-- -->
        //Equal, return threshold
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {<!-- -->
        // Complete the expansion in steps of 4MB
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {<!-- -->
            newCapacity = maxCapacity;
        } else {<!-- -->
            newCapacity + = threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    //Use 64 as the base number to complete the expansion by doubling
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {<!-- -->
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

  1. Determine the size relationship between the target value and the threshold threshold (4MB), which is equivalent to directly returning the threshold.
  2. Capacity expansion is completed in steps of 4MB
  3. Use 64 as the base number to complete the expansion by doubling

Summarize:
Netty’s ByteBuf needs to be dynamically expanded to meet the needs. The expansion process: the default threshold is 4MB (this threshold is an empirical value, and the value may be different in different scenarios). When the required capacity is equal to the threshold, use the threshold as a new cache area. Capacity target capacity. If it is greater than the threshold, the memory will be expanded in steps of 4MB each time ((required expansion value/4MB)*4MB). After expansion, it needs to be compared with the maximum memory (maxCapacity). If it is greater than maxCapacity, use maxCapacity. Otherwise, use the expansion value target capacity. If it is less than the threshold, adopt a doubling method, using 64 (bytes) as the basic value. Each doubling increases by 64 –> 128 –> 256, until the doubled result is greater than or equal to the need. capacity value.

Handler’s life cycle callback interface calling sequence

/**
 * The following handler is found in the channel pipeline: ch.pipeline().addLast(new LifeCycleInBoundHandler());
 *Handler's life cycle callback interface calling sequence:
 * handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete
 * -> channelInactive -> channelUnRegistered -> handlerRemoved
 *
 * handlerAdded: The newly established connection will add the handler to the pipeline of the channel according to the initialization strategy, which is the callback after the execution of channel.pipeline.addLast(new LifeCycleInBoundHandler) is completed;
 * channelRegistered: This callback will be called when the connection is assigned to a specific worker thread.
 * channelActive: The preparation of the channel has been completed, all pipelines have been added and assigned to specific lines, indicating that the channel is ready and can be used.
 * channelRead: The client sends data to the server, and this method will be called back every time to indicate that there is data to read;
 * channelReadComplete: After the server reads complete data each time, it calls back this method to indicate that the data reading is complete;
 * channelInactive: When the connection is disconnected, this callback will be called, indicating that the underlying TCP connection has been disconnected at this time.
 * channelUnRegistered: Corresponds to channelRegistered. When the connection is closed, the bound worker thread is released;
 * handlerRemoved: Corresponds to handlerAdded, the callback method after removing the handler from the pipeline of the channel.
 */
public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter {<!-- -->
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("channelRegistered: channel registered to NioEventLoop");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("channelUnregistered: channel unbinds from NioEventLoop");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("channelActive: channel is ready");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("channelInactive: channel is closed");
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {<!-- -->
        System.out.println("channelRead: There is readable data in the channel" );
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("channelReadComplete: channel reading data is completed");
        super.channelReadComplete(ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("handlerAdded: handler is added to the channel's pipeline");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)
            throws Exception {<!-- -->
        System.out.println("handlerRemoved: handler is removed from the channel's pipeline");
        super.handlerRemoved(ctx);
    }
}