Netty’s overall concept analysis four

Why doesn’t ServerSocketChannel here use ServerSocket?

Selector:

How to execute asynchronously?

1.Future interface
2. Get the task execution results through Future

Disadvantages of Future: When using Future to obtain asynchronous execution results, you must either call the blocking method get() or poll to see if isDone() is true. Both methods are not very good because the main thread will also be forced to wait.

How to solve? Using the observer pattern, when execution is completed, the callback is automatically called back in the execution thread.

Of course, Java has already helped us implement it in FutureTask.

/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null; // to reduce footprint
}

protected void done() { }

There is an interface Promise in Netty to accomplish this. At this time, just combine Promise with Runnable.

public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V var1);
    boolean trySuccess(V var1);
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
    Promise<V> await() throws InterruptedException;
}
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    @Override
    public V get() throws InterruptedException, ExecutionException {
        Object result = this.result;
        if (!isDone0(result)) {
            await();
            result = this.result;
        }
        if (result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        Throwable cause = cause0(result);
        if (cause == null) {
            return (V) result;
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }
    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        checkDeadLock();
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();//wait if not completed
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }
    //Trigger the listener
    @Override
    public boolean trySuccess(V result) {
        return setSuccess0(result);
    }
    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }
    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                notifyListeners();
            }
            return true;
        }
        return false;
    }
}
What characteristics should the threads of the event loop group have?

1. Load balancing

2. Periodically scheduled work

Memory pool design:

1. Memory types: DirectByteBuffer, HeapByteBuffer. I want to reduce the loss caused by memory allocation. Can Netty manage the memory by itself?

Operating system memory allocation strategy

First-fit algorithm (first-fit): Search the free partition table from the first entry in the table, and allocate the first free area that meets the requirements to the job. The purpose of this method is to reduce the search time.

Best-fit algorithm (best-fit): Find the smallest free partition that can meet the job requirements from all free areas. This method can make the fragments as small as possible.

Worst-fit algorithm: It finds the largest free partition that can meet the job requirements from all free areas, so that the node size in the linked list tends to be uniform.

How to reduce external fragmentation?

The partner algorithm splits the memory into a full binary tree.

As shown in the figure, if 8KB is allocated, then the leaf node at the same level still has 8KB of available space. If 8KB is allocated next time, the 8KB on the right can be allocated directly. If the two spaces are released, they will be merged into 16KB of available space. The space is allocated in full, which reduces the occurrence of external fragmentation.

This algorithm cannot avoid the occurrence of external fragmentation.

As shown in the figure, 8KB and 16KB have been allocated. If you want to allocate 16KB at this time, you cannot allocate it in the 32KB on the left because there are still 8KB available, and these 8KB are external fragments.

How to reduce internal fragmentation?

Slab algorithm, if you want to allocate 1KB, then split 8KB into 8 1KB pieces. If you want to allocate 2KB, then split it into 4 2KB pieces.

If you want to allocate 3KB, you can only split it into two 3KB and one 2KB, which will produce 2KB internal fragmentation.

In Netty, the minimum value of leaf nodes is 8KB, which can not only be divided into enough parts, but also minimize internal fragmentation.

High concurrency lock optimization:

1. Lock fine-graining

Multiple 1024KB memory blocks can be allocated

2. Lock-free

Allocate memory in thread local cache (ThreadLocal)

There are multiple 1024KB memory blocks. How can I reduce external fragmentation more reasonably? Do I allocate from a highly used block of memory, or from a lowly used one?

public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    static {
        //The default leaf node is 8KB
        int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
        //The tree height of the binary tree is 2^11
        int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
        private final PoolArena<byte[]>[] heapArenas;
        private final PoolArena<ByteBuffer>[] directArenas;
    }
    //PreferDirect whether to allocate direct memory
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
                                  int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
                                  int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads) {
        this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
             tinyCacheSize, smallCacheSize, normalCacheSize,
             useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
    }
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        super(preferDirect);
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
    }
    final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        @Override
        protected synchronized PoolThreadCache initialValue() {
            //Find the Arena with the smallest number of threads (memory area, equivalent to a 1024 block, a PoolArena in netty is 16M)
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
            final Thread current = Thread.currentThread();
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }
        private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
            if (arenas == null || arenas.length == 0) {
                return null;
            }
            PoolArena<T> minArena = arenas[0];
            for (int i = 1; i < arenas.length; i + + ) {
                PoolArena<T> arena = arenas[i];
                if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                    minArena = arena;
                }
            }
            return minArena;
        }
    }
}
@Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<byte[]> heapArena = cache.heapArena;
        final ByteBuf buf;
        if (heapArena != null) {
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity):
                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }
        return toLeakAwareBuffer(buf);
    }
abstract class PoolArena<T> implements PoolArenaMetric {
    protected PoolArena(PooledByteBufAllocator parent, int pageSize,
                        int pageShifts, int chunkSize, int cacheAlignment) {
        //Calculate the usage rate of each PoolChunk and put it into the corresponding List
        q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
        q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
        q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
        q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
        q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

        q100.prevList(q075);
        q075.prevList(q050);
        q050.prevList(q025);
        q025.prevList(q000);
        q000.prevList(null);
        qInit.prevList(qInit);

        List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
        metrics.add(qInit);
        metrics.add(q000);
        metrics.add(q025);
        metrics.add(q050);
        metrics.add(q075);
        metrics.add(q100);
        chunkListMetrics = Collections.unmodifiableList(metrics);
    }
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
    }
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int sizeIdx = size2SizeIdx(reqCapacity);
        if (sizeIdx <= smallMaxSizeIdx) {
            //Allocate small page memory and use slab algorithm
            tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
        } else if (sizeIdx < nSizes) {
            tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
        } else {
            int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
            //Allocate large page memory
            allocateHuge(buf, normCapacity);
        }
    }
    private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
        //A PoolChunk represents a large page memory
        PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
        activeBytesHuge.add(chunk.chunkSize());
        buf.initUnpooled(chunk, reqCapacity);
        allocationsHuge.increment();
    }
    //
    /**
* First select a "PoolChunk" from the "PoolChunkList" linked list for memory allocation. If a suitable "PoolChunk" object cannot be found,
* Then you can only create a new "PoolChunk" object, which needs to be added to the corresponding PoolChunkList linked list after completing the memory allocation.
* There are multiple "PoolChunkList" linked lists internally, q050 and q025 represent the lowest usage of the internal "PoolChunk".
* Netty will start allocating from q050 first, not from q000.
* This is because if memory is allocated starting from q000, most PoolChunks will face frequent creation and destruction, resulting in reduced memory allocation performance.
*
* @param buf ByeBuf carrying object
* @param reqCapacity The actual memory size required by the user
* @param sizeIdx corresponds to the index value of {@link SizeClasses}, which can be used to obtain the corresponding specification value from {@link SizeClasses}
* @param threadCache local thread cache. This cache is mainly used to fill the cache variables inside the object when initializing PooledByteBuf.
*/
    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
        // #1 Try to allocate from the "PoolChunkList" linked list (find the existing "PoolChunk" for memory allocation)
        if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
            // If the allocation is successful, return directly
            return;
        }
        // #2 Create a new "PoolChunk" object
        PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
        // #3 Use new "PoolChunk" to complete memory allocation
        boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
        assert success;
        // #4 Add to the "PoolChunkList" node based on the lowest one
        qInit.add(c);
    }
}