How to dynamically refresh the configuration of SpringBoot (implemented by Nacos)

Nacos startup loading configuration

Load NacosConfigBootstrapConfiguration through META-INF/spring.factories

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration

Initialize NacosConfigProperties and NacosPropertySourceLocator

@Configuration
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {

@Bean
@ConditionalOnMissingBean
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties();
}

@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(
NacosConfigProperties nacosConfigProperties) {
return new NacosPropertySourceLocator(nacosConfigProperties);
}

}

Load configuration to org.springframework.core.env.Environment via com.alibaba.cloud.nacos.client.NacosPropertySourceLocator

@Override
public PropertySource<?> locate(Environment env) {

ConfigService configService = nacosConfigProperties.configServiceInstance();

if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties. getName();

String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils. isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}

if (StringUtils. isEmpty(dataIdPrefix)) {
dataIdPrefix = env. getProperty("spring. application. name");
}

CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);

loadSharedConfiguration(composite);
loadExtConfiguration(composite);
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);

return composite;
}

Nacos start registration com.alibaba.nacos.api.config.listener.Listener

Load NacosConfigAutoConfiguration from META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration

Initialize com.alibaba.cloud.nacos.refresh.NacosContextRefresher to implement start registration Listener

 @Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring contexts
if (this. ready. compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}

    private void registerNacosListenersForApplications() {
if (refreshProperties. isEnabled()) {
for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository
.getAll()) {

if (!nacosPropertySource.isRefreshable()) {
continue;
}

String dataId = nacosPropertySource.getDataId();
registerNacosListener(nacosPropertySource.getGroup(), dataId);
}
}
}

private void registerNacosListener(final String group, final String dataId) {

Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
refreshCountIncrement();
String md5 = "";
if (!StringUtils. isEmpty(configInfo)) {
try {
MessageDigest md = MessageDigest. getInstance("MD5");
md5 = new BigInteger(1, md. digest(configInfo. getBytes("UTF-8")))
.toString(16);
}
catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);
}
}
refreshHistory.add(dataId, md5);
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log. isDebugEnabled()) {
log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
}
}

@Override
public Executor getExecutor() {
return null;
}
});

try {
configService.addListener(dataId, group, listener);
}
catch (NacosException e) {
e.printStackTrace();
}
}

Publish the update event through applicationContext.publishEvent(new RefreshEvent(this, null, “Refresh Nacos config”)); and trigger the listener operation org.springframework.cloud.endpoint.event.RefreshEventListener

org.springframework.cloud.endpoint.event.RefreshEventListener


    @Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return ApplicationReadyEvent.class.isAssignableFrom(eventType)
|| RefreshEvent.class.isAssignableFrom(eventType);
}

@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
handle((ApplicationReadyEvent) event);
}
else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}

public void handle(ApplicationReadyEvent event) {
this.ready.compareAndSet(false, true);
}

public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this. refresh. refresh();
log.info("Refresh keys changed: " + keys);
}
}

The listener triggers a call to org.springframework.cloud.context.refresh.ContextRefresher.refresh()

 public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}

public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(
this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment();
Set<String> keys = changes(before,
extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}



Set<String> keys = refreshEnvironment();
    1. Realize the refresh of Environment
    2. Get the key whose configuration changes
    3. Release of the EnvironmentChangeEvent event, trigger monitoring
org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder
Realize the refresh of org.springframework.boot.context.properties.ConfigurationProperties annotation


this.scope.refreshAll();
    Refresh beans managed by org.springframework.cloud.context.scope.refresh.RefreshScope


Nacos implements monitoring

Load NacosConfigAutoConfiguration from META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration

Obtain com.alibaba.nacos.api.config.ConfigService according to the configuration

Set com.alibaba.nacos.client.config.impl.ClientWorker

 1. Get ConfigService com.alibaba.cloud.nacos.NacosConfigAutoConfiguration.nacosContextRefresher(NacosConfigProperties, NacosRefreshProperties, NacosRefreshHistory)

     @Bean
public NacosContextRefresher nacosContextRefresher(
NacosConfigProperties nacosConfigProperties,
NacosRefreshProperties nacosRefreshProperties,
NacosRefreshHistory refreshHistory) {
return new NacosContextRefresher(nacosRefreshProperties, refreshHistory,
nacosConfigProperties.configServiceInstance());
}

2. Get ConfigService
com.alibaba.cloud.nacos.NacosConfigProperties.configServiceInstance()

3. Factory method to get ConfigService
com.alibaba.nacos.api.NacosFactory.createConfigService(Properties)

    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor. newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

4. Generate NacosConfigService by reflection
com.alibaba.nacos.client.config.NacosConfigService.NacosConfigService(Properties)

    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties. getProperty(PropertyKeyConst. ENCODE);
        if (StringUtils. isBlank(encodeTmp)) {
            encode = Constants. ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        agent.start();
        worker = new ClientWorker(agent, configFilterChainManager, properties);
    }

5. Instantiate the ClientWorker
com.alibaba.nacos.client.config.impl.ClientWorker.ClientWorker(HttpAgent, ConfigFilterChainManager, Properties)

ClientWorker realizes timing detection every 10ms

@SuppressWarnings("PMD.ThreadPoolCreationRule")
    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this. agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);

        executor = Executors. newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t. setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t. setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER. error("[" + agent. getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit. MILLISECONDS);
    }



The executor runs com.alibaba.nacos.client.config.impl.ClientWorker.checkConfigInfo() regularly
    Execute once in 10ms per thread
executorService runs LongPollingRunnable

LongPollingRunnable implements change notifications for long-running training

com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable


        @Override
        public void run() {

            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
                for (CacheData cacheData : cacheMap. get(). values()) {
                    if (cacheData. getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            checkLocalConfig(cacheData);
                            if (cacheData. isUseLocalConfigInfo()) {
                                cacheData. checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER. error("get local config info error", e);
                        }
                    }
                }

                // check server config
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey. parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key. length == 3) {
                        tenant = key[2];
                    }
                    try {
                        String content = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(content));
                    } catch (NacosException ioe) {
                        String message = String. format(
                            "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                            agent.getName(), dataId, group, tenant);
                        LOGGER. error(message, ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        cacheData. checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList. clear();

                executorService. execute(this);

            } catch (Throwable e) {

                // If the rotation training task is abnormal, the next execution time of the task will be punished
                LOGGER. error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit. MILLISECONDS);
            }
        }
    }
## com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable

1. Loop through CacheData
2. Detect local configuration
    com.alibaba.nacos.client.config.impl.ClientWorker.checkLocalConfig(CacheData)
3. If it is a local configuration, perform md5 comparison
    if (cacheData. isUseLocalConfigInfo()) { cacheData. checkListenerMd5(); }


## com.alibaba.nacos.client.config.impl.CacheData
1. Perform md5 comparison
    com.alibaba.nacos.client.config.impl.CacheData.checkListenerMd5()
2. If md5 changes, execute the callback
    com.alibaba.nacos.client.config.impl.CacheData.safeNotifyListener(String, String, String, String, ManagerListenerWrap)
3. Execute the listener callback: there is a thread pool, the thread pool executes, and there is no direct operation




private void safeNotifyListener(final String dataId, final String group, final String content,
                                    final String md5, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap. listener;

        Runnable job = new Runnable() {
            @Override
            public void run() {
                ClassLoader myClassLoader = Thread. currentThread(). getContextClassLoader();
                ClassLoader appClassLoader = listener. getClass(). getClassLoader();
                try {
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener) listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
                    // Before executing the callback, set the thread classloader to the classloader of the specific webapp, so as to avoid abnormal or misuse of calling the spi interface in the callback method (multi-application deployment will only have this problem).
                    Thread.currentThread().setContextClassLoader(appClassLoader);

                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    configFilterChainManager.doFilter(null, cr);
                    String contentTmp = cr. getContent();
                    listener.receiveConfigInfo(contentTmp);
                    listenerWrap.lastCallMd5 = md5;
                    LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                        listener);
                } catch (NacosException de) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
                        dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
                } catch (Throwable t) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
                        md5, listener, t. getCause());
                } finally {
                    Thread.currentThread().setContextClassLoader(myClassLoader);
                }
            }
        };

        final long startNotify = System. currentTimeMillis();
        try {
            if (null != listener. getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                job. run();
            }
        } catch (Throwable t) {
            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
                md5, listener, t. getCause());
        }
        final long finishNotify = System. currentTimeMillis();
        LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
            name, (finishNotify - startNotify), dataId, group, md5, listener);
    }

Com.alibaba.nacos.api.config.listener.Listener triggers receiveConfigInfo to publish refresh events and complete the triggering of RefreshEvent