Netty event loop group principle five

Executor

Executor is an interface that is responsible for executing tasks (Runnable objects) submitted to it. This interface can decouple “task submission” and “task execution”. That is, someone only needs to submit the task to the Executor, and you don’t have to worry about how it allocates threads to the task for execution. However, the Executor interface does not strictly require execution operations to be asynchronous.

public interface Executor {
    //Execute the given command at some time in the future. This command may be executed in a new thread, a pooled thread, or the calling thread.
    //This is determined by the Executor implementation.
    void execute(Runnable command);
}
ExecutorService

ExecutorService is an interface defined for the thread pool in Java. It is in the java.util.concurrent package. In this interface, methods related to background task execution are defined:

ScheduledExecutorService

ScheduledExecutorService is a function that implements delay and periodic execution of tasks based on the functions of ExecutorService.

Each task and each cycle of each task will be submitted to the thread pool for execution by threads, so the threads that execute the task in different cycles may be different. The default implementation class of ScheduledExecutorService interface is ScheduledThreadPoolExecutor. Among tasks that are executed periodically, if the task execution time is greater than the cycle time, the task time will take priority, and the next cycle will not be entered until the task is completed.

Implementation principle of schedule method

Implementation principles of scheduleAtFixedRate() and scheduleWithFixedDelay()

1. Why use priority queue (small top heap) instead of ordered array or linked list?

Because the priority queue only needs to ensure local ordering, the complexity of its insertion and deletion operations is O(log n); while the insertion and deletion complexity of the ordered array is O(n); the insertion complexity of the linked list is O (n), the deletion complexity is O(1). Overall priority queue performance is the best.

2. Why did I repeatedly implement DelayedWorkQueue without encapsulating PriorityQueue or directly using DelayQueue or PriorityBlockingQueue?

I haven’t thought of the reason yet. It stands to reason that DelayQueue can basically do everything DelayedWorkQueue can do. The implementation does not understand why it is necessary to reinvent the wheel.

As mentioned before, the event loop group needs to do some periodic scheduling work, so let it inherit ScheduledExecutorService.

EventExecutorGroup

EventExecutorGroup inherits from JDK’s ScheduledExecutorService, which can execute scheduled tasks or submit tasks for execution like a normal task executor. It is used to manage EventExecutor in the group. It is responsible for providing the EventExecutor for use via its next() method. Among other things, it takes care of their lifecycle and allows them to be shut down in a global way.

EventExecutorGroup itself does not execute tasks, but submits or schedules tasks to the group of EventExecutors it manages. As for which EventExecutor to submit to, it is usually to select an EventExecutor through the next() method, because there are many EventExecutors in the Group. As for the specific return Which EventExecutor is still implemented by a specific implementation class.

EventExecutor

It is a special EventExecutorGroup. There is only one thread in the group, and next always points to itself.

public interface EventExecutor extends EventExecutorGroup {
    //Returns a reference to itself.
    //The next method here returns the EventExecutor itself.
    @Override
    EventExecutor next();
    //Return the EventExecutorGroup which is the parent of this EventExecutor
    //Because EventExecutor is managed by EventExecutorGroup, there is also an EventExecutor
    //parent method, used to return the EventExecutorGroup that manages EventExecutor
    EventExecutorGroup parent();
    //Two new inEventLoop methods have been added to EventExecutor to determine whether a given thread is executed in the event loop.
    boolean inEventLoop();
    boolean inEventLoop(Thread thread);
}
EventLoopGroup

Inherits EventExecutorGroup and provides EventExecutorGroup with the ability to register Channel. The next of EventExecutorGroup is any returned EventExecutor filtered by the load balancer, and the next of EventLoopGroup is the next one returned, stringing together these EventLoops. It is a cyclical process.

public interface EventLoopGroup extends EventExecutorGroup {
    //Return the next {@link EventLoop} to use
    @Override
    EventLoop next();
    ChannelFuture register(Channel channel);
}

From the beginning, there has been an executor called EventExecutorGroup, which inherits from ScheduledExecutorService. It then has some basic functions of scheduling execution, and can obtain one of the EventExecutors in the group through the next method. EventExecutor is the executor of an execution event in EventExecutorGroup. Then this execution body is a special EventExecutorGroup, because a person is also a group, but there is only one thread in the group, so why was EventLoopGroup introduced? Sometimes we want to register a Channel, and we want to connect these threads. At this time, we loop through the thread array through next.

AbstractEventExecutorGroup

AbstractEventExecutorGroup only abstractly implements the interface and delegates the implementation of some methods to the EventExecutor returned by the next() method, but does not specifically implement the next() method.

MultithreadEventExecutorGroup
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    private final EventExecutor[] children;
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    //If threadFactory is not empty, ThreadPerTaskExecutor will be called
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }
    //Initialize selector
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        //Prevent the direct new constructor from being passed empty by the executor
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //Instantiate EventExecutor array
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i + + ) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                ...
            }
        }
        //Create selector
        chooser = chooserFactory.newChooser(children);
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        //The bottom layer of unmodifiableSet is that the addition, deletion and modification methods all return exceptions.
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    //Create executor from subclass
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    @Override
    public EventExecutor next() {
        return chooser.next();
    }
}
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    private DefaultEventExecutorChooserFactory() { }
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //Judge whether it is a multiple of 2
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    private static boolean isPowerOfTwo(int val) {
        return (val & amp; -val) == val;
    }
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
        @Override
        public EventExecutor next() {
            //AND operation instead of modulo
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;
        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
        @Override
        public EventExecutor next() {
            //Prevent symbol overflow, so use Math.abs
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    private static final int DEFAULT_EVENT_LOOP_THREADS; //Default number of threads
    static {
        //Number of CPU cores*2
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    }
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    //It is not implemented either
    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}

When the CPU performs tasks, each core obtains it from a RunQueue. At this time, it is a single core and a single thread. This thread has been waiting during the execution of IO, and the CPU is idling, which is a waste of resources.

If there are two threads on a single core, then the first thread can continue to execute the second one while it is waiting for IO.

If there are three threads on a single core, then when the first thread’s IO wait ends, it will be put back into the RunQueue to wait for CPU scheduling. However, since there are still two threads executing in front, it may take a long time before the first thread’s turn. The thread continues execution.

Single core 4 threads…

Therefore, it is more reasonable to set the number of cores * 2.

NioEventLoopGroup
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    //Use CPU core*2
    public NioEventLoopGroup() {
        this(0);
    }
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
}
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
              rejectedExecutionHandler);
        //Assign value to selectorProvider
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        //select strategy
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        //Open Selector
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
}

SingleThreadEventLoop

Based on the single-threaded EventLoop abstract class, a single thread executes all tasks, mainly adding Channel registration to EventLoop.

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    private final Queue<Runnable> tailTasks;
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        //After one round of runAllTasks in the EventLoop event loop ends, execute the finishing task
        //For example, some tasks need to be executed after an event loop, as the title says,
        //If the user wants to count how long it takes to execute an event loop, he can call this method to implement it himself.
        //The statistical business is encapsulated as a task and submitted to the NioEventLoop thread. When the NIO thread completes an event loop,
        //This task will be automatically executed
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }
    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
}
SingleThreadEventExecutor

SingleThreadEventExecutor uses a single thread to execute submitted tasks, so there are two queues at this time, one is the scheduledTaskQueue of the parent class, and the other is its own taskQueue

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    private final Queue<Runnable> taskQueue;
    private volatile Thread thread;
    protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
        this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
    }
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                   ...
                }
            }
        });
    }
}
AbstractScheduledEventExecutor

AbstractScheduledEventExecutor inherits from AbstractEventExecutor. It uses a PriorityQueue internally to store ScheduledFutureTask containing scheduled tasks, thereby realizing the function of scheduled tasks:

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
}