Meituan Dynamics ThreadPoolExecutor underlying implementation source code actual combat

Opening: Introducing springboot to connect nacos to implement dynamic thread pool. Nacos must be installed at the same time. At the same time, the code will have two modules, dtp-spring-boot-starter and user module. The former will be an independent dynamic thread pool. , which can be introduced into your own project. The latter module is mainly used for testing and using the dynamic thread pool module.

Dependencies and Tools Description
springboot 2.3.9.RELEASE
nacos-config-spring-boot-starter 0.2.10
nacos 2.1.1

Pay attention to the adapted version of springboot and nacos!

Complete code gitee: ThreadPopl-starter: dynamic thread pool code

1. Build implementation

1.Create two modules and configure them

user: test module (husband module)

Maven dependencies:

 <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.3.9.RELEASE</version>
<!-- <version>2.0.3.RELEASE</version>-->
    </parent>

    <groupId>org.example</groupId>
    <artifactId>ThreadPool-demo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <modules>
        <module>dtp-spring-boot-starter</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

Write the configuration code for connecting to nacos in the configuration file

nacos:
  config:
    server-addr: 192.168.1.116:8848
    username: nacos
    password: nacos
    #Don’t write the wrong namespace
    namespace: 6b5d4d2a-5385-4d9f-85a1-18b748b8256c

dtp-spring-boot-starter: dynamic thread pool (submodule)

Maven dependencies:

 <parent>
        <artifactId>user</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>dtp-spring-boot-starter</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>nacos-config-spring-boot-starter</artifactId>
            <version>0.2.10</version>
        </dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba.boot</groupId>-->
<!-- <artifactId>nacos-config-spring-boot-actuator</artifactId>-->
<!-- <version>0.2.10</version>-->
<!-- </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>

    </dependencies>
2. Start writing dynamic thread pool configuration (dtp-spring-boot-starter module)
Create a dynamic thread pool object
package com.laoyang.dtp;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author:Kevin
 * @create: 2023-10-24 17:13
 * @Description: Implement dynamic thread pool object
 */

public class DtpExecutor extends ThreadPoolExecutor {
    public DtpExecutor(int corePoolSize, int maximumPoolSize) {
        super(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    }
}
Create a dynamic thread pool core configuration class

1. Injection of related beans 2. Injection of nacos-monitored beans

package com.laoyang.dtp;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/**
 * @author:Kevin
 * @create: 2023-10-24 18:36
 * @Description: Dynamic thread pool core configuration class
 */
@Configuration
public class DtpExecutorAutoConfiguration {

    @Autowired
    private environment environment;

    //Maximum number of cores
    private static final String CORE_POOL_SIZE = "dtp.core-pool-size";
    //Maximum number of threads
    private static final String MAXIMUM_POOL_SIZE = "dtp.maximum-pool-size";

    //Create a dynamic thread pool object
    @Bean
    public DtpExecutor executor(){

        Integer corePoolSize = Integer.valueOf(environment.getProperty(CORE_POOL_SIZE));
        Integer maximumPoolSize = Integer.valueOf(environment.getProperty(MAXIMUM_POOL_SIZE));

        return new DtpExecutor(corePoolSize,maximumPoolSize);
    }

    @Bean
    public NacosLinsenter NacosLinsenter(){
        return new NacosLinsenter();
    }


}
Then inject the core configuration class through springboot’s automatic configuration

Then write the following code

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.laoyang.dtp.DtpExecutorAutoConfiguration
Create a nacos listening class and implement dynamic binding

Use the Listener interface of nacos to implement corresponding methods to write dynamic transformation logic, and at the same time implement the functions provided by spring.

The InitializingBean interface binds the current listening class to the dataId one by one through the addListener() method of nacos's ConfigService. (As long as the configuration file of dataId changes, the currently bound listening class will call the corresponding method). Finally, the thread pool object Bean is injected, and the modified configuration file value is injected into the thread pool object Bean to realize the dynamic thread pool.
getExecutor()
Create a thread pool for the following calls
receiveConfigInfo()
Every time the current dataId changes, this method will be called
package com.laoyang.dtp;

import com.alibaba.nacos.api.annotation.NacosInjected;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.core.io.ByteArrayResource;

import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


/**
 * @author:Kevin
 * @create: 2023-10-24 19:56
 * @Description: nacos comes with a listener
 */

public class NacosLinsenter implements Listener, InitializingBean {

    @NacosInjected
    private ConfigService configService;

    @Autowired
    private DtpExecutor executor;

    //The name of dataId of nacos
    private static final String DATA_ID = "dtp.yaml";

    private static final String GROUP = "DEFAULT_GROUP";
    //Maximum number of cores
    private static final String CORE_POOL_SIZE = "dtp.core-pool-size";
    //Maximum number of threads
    private static final String MAXIMUM_POOL_SIZE = "dtp.maximum-pool-size";


    //Create a thread pool for the following calls
    @Override
    public Executor getExecutor() {
        return Executors.newFixedThreadPool(1);
    }

    //This method will be called every time the current dataId changes.
    //But the thread calling this method needs the above method to create a thread pool
    @Override
    public void receiveConfigInfo(String s) {
        //First you need to convert the string yml format into map format
        YamlPropertiesFactoryBean factoryBean = new YamlPropertiesFactoryBean();
        factoryBean.setResources(new ByteArrayResource(s.getBytes()));
        //Use springboot's built-in tool class to convert Properties to a map-like format
        Properties object = factoryBean.getObject();
        //Get updated data
        String corePoolSize = object.getProperty(CORE_POOL_SIZE);
        String maximumPoolSize = object.getProperty(MAXIMUM_POOL_SIZE);
        //Update data directly
        executor.setCorePoolSize(Integer.parseInt(corePoolSize));
        executor.setMaximumPoolSize(Integer.parseInt(maximumPoolSize));

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //Bind this NacosLinsenter to the current dataId one by one
        configService.addListener(DATA_ID,GROUP,this);
    }
}
3. Start using the user module dtp-spring-boot-starter module

Create a startup Springboot configuration class

package com.laoyang;

import com.alibaba.nacos.spring.context.annotation.config.NacosPropertySource;
import com.alibaba.nacos.spring.context.annotation.discovery.EnableNacosDiscovery;
import com.laoyang.dtp.DtpExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;


/**
 * @author:Kevin
 * @create: 2023-10-24 17:04
 * @Description:
 */
@SpringBootApplication
@NacosPropertySource(dataId = "dtp.yaml", autoRefreshed = true)
public class UserApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }
}

Create a Controller to inject dynamic thread pool objects for use

package com.laoyang.Controller;

import com.laoyang.dtp.DtpExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author:Kevin
 * @create: 2023-10-24 17:05
 * @Description: View layer
 */
@Controller
public class UserController {

    @Autowired
    private DtpExecutor executor;

    @GetMapping
    public Integer test(){
        executor.execute(() -> dotest());
        return 1;
    }

    public void dotest(){
        System.out.println("dotest");
    }


}

You’re done! !

2. Improvement

Optimization 1: If we do not configure the number of core threads or the maximum number of threads, an error will be reported, so we need to create default values elegantly.

Step: Create Profile Object (

@ConfigurationProperties("dtp") This annotation will find the yml format fields of the nacos configuration file based on the parameters and turn them into Bean objects. 

)

package com.laoyang.dtp;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author:Kevin
 * @create: 2023-10-24 21:29
 * @Description: Create configuration file object
 */
@ConfigurationProperties("dtp")
public class DtpProperties {

    private String corePoolSize = "10";
    private String maximumPoolSize = "100";

    public String getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(String corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public String getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setMaximumPoolSize(String maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }
}

Then add the @EnableConfigurationProperties(DtpProperties.class) annotation to the DtpExecutorAutoConfiguration core configuration class

Then optimize by passing parameters

Optimization 2. The above can only implement one thread pool object, but in the actual project, there is not only this thread pool object, so we need to optimize it next!

Create aDtpUtil that will be used to store multiple created thread pool objects in the future

package com.laoyang.dtp;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author:Kevin
 * @create: 2023-10-24 22:29
 * @Description: Thread-safe ConcurrentHashMap
 */

public class DtpUtil {

// public static ConcurrentHashMap<String,DtpExecutor> map = new ConcurrentHashMap<>();
    public static HashMap<String,DtpExecutor> map = new HashMap<>();

    public static void set(String name,DtpExecutor dtpExecutor){
        map.put(name,dtpExecutor);
    }

    public static DtpExecutor get(String name) {
        return map.get(name);
    }

}

Implement manual registration of bean objects

package com.laoyang.dtp;

import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanNameGenerator;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;

/**
 * @author:Kevin
 * @create: 2023-10-24 21:57
 * @Description: Register configuration file bean
 */

public class DtpImportBeanDefinationRegister implements ImportBeanDefinitionRegistrar, EnvironmentAware {

    //Pass in the configuration file object
    private environment environment;

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {
        //Register the bean, read the thread pool list defined behind the user-configured dtp, and then convert it to a DtpProperties object
        DtpProperties dtpProperties = new DtpProperties();

        Binder binder = Binder.get(environment);
        ResolvableType type = ResolvableType.forClass(DtpProperties.class);
        Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
        binder.bind("dtp",target);

        //Traverse the configuration and get all thread pool lists
        for (DtpProperties.DtpExecutorProperties executorProperties : dtpProperties.getDtpExecutors()) {
            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition().getBeanDefinition();
            beanDefinition.setBeanClass(DtpExecutor.class);
            //Pass two values into the parameterized function of this DtpExecutor
            beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(executorProperties.getCorePoolSize());
            beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(executorProperties.getMaximumPoolSize());

            registry.registerBeanDefinition(executorProperties.getName(),beanDefinition);

        }


    }

    //
    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}

Also don’t forget to inject this configuration into the core configuration class

Create bean post-processor

package com.laoyang.dtp;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

/**
 * @author:Kevin
 * @create: 2023-10-25 10:29
 * @Description: bean's post-processor: used to inject our multi-dynamic thread pool objects
 */

public class DtpBeanPostProcessor implements BeanPostProcessor {

    /**
     * Inject this object DtpExecutor
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (bean instanceof DtpExecutor){
            DtpUtil.set(beanName,(DtpExecutor)bean);
        }

        return bean;
    }
}

You also need to import the configuration class

Finally modify the code in the listening class (NacosLinsenter)

package com.laoyang.dtp;

import com.alibaba.nacos.api.annotation.NacosInjected;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


/**
 * @author:Kevin
 * @create: 2023-10-24 19:56
 * @Description: nacos comes with a listener
 */

public class NacosLinsenter implements Listener, InitializingBean {

    @NacosInjected
    private ConfigService configService;

    //The name of dataId of nacos
    private static final String DATA_ID = "dtp.yaml";

    private static final String GROUP = "DEFAULT_GROUP";


    //Create a thread pool for the following calls
    @Override
    public Executor getExecutor() {
        return Executors.newFixedThreadPool(1);
    }

    //This method will be called every time the current dataId changes.
    //But the thread calling this method needs the above method to create a thread pool
    @Override
    public void receiveConfigInfo(String s) {
        //First you need to convert the string yml format into map format (single thread pool)
        YamlPropertiesFactoryBean factoryBean = new YamlPropertiesFactoryBean();
        factoryBean.setResources(new ByteArrayResource(s.getBytes()));
        //Use springboot's built-in tool class to convert Properties to a map-like format
        Properties properties = factoryBean.getObject();

        //First you need to convert the string yml format into List format (multiple thread pools)
        DtpProperties dtpProperties = new DtpProperties();
        MapConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);
        Binder binder = new Binder(sources);
        ResolvableType type = ResolvableType.forClass(DtpProperties.class);
        Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
        binder.bind("dtp",target);

        //Get the content of the changed configuration
        List<DtpProperties.DtpExecutorProperties> executors = dtpProperties.getDtpExecutors();

        //Inject the modified data into the bean
        for (DtpProperties.DtpExecutorProperties executor : executors) {
            //Get the bean object
            DtpExecutor dtpExecutor = DtpUtil.get(executor.getName());
            //Then modify the data
            dtpExecutor.setCorePoolSize(executor.getCorePoolSize());
            dtpExecutor.setMaximumPoolSize(executor.getMaximumPoolSize());

        }

        //Get updated data
// String corePoolSize = properties.getProperty(CORE_POOL_SIZE);
// String maximumPoolSize = properties.getProperty(MAXIMUM_POOL_SIZE);
// //Update data directly
// executor.setCorePoolSize(Integer.parseInt(corePoolSize));
// executor.setMaximumPoolSize(Integer.parseInt(maximumPoolSize));

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //Bind this NacosLinsenter to the current dataId one by one
        configService.addListener(DATA_ID,GROUP,this);
    }
}

Finally modified to multiple configurations in nacos

Optimization 3: Implement alarm function

Create DtpMonitor monitoring class

package com.laoyang.dtp;

import org.springframework.beans.factory.InitializingBean;

import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author:Kevin
 * @create: 2023-10-25 11:05
 * @Description: Optimized listener:
 * Monitor the number of active threads
 * Start scheduled tasks
 */

public class DtpMonitor implements InitializingBean {

    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

    @Override
    public void afterPropertiesSet() throws Exception {
        //Execute this method every 5 seconds
        executor.scheduleAtFixedRate(() -> {

            for (Map.Entry<String, DtpExecutor> entry : DtpUtil.map.entrySet()) {
                String name = entry.getKey();
                DtpExecutor exectuor = entry.getValue();

                //Active number of thread pools
                int activeCount = exectuor.getActiveCount();

                if (activeCount > 20){
                    //TODO thread pool active thread pool alarm
                    System.out.println("Warning!");
                }
            }

        },5,5, TimeUnit.SECONDS);

    }
}

At the same time, inject the current configuration into the core configuration class

At this point, the project integration is complete! ! !