Nacos startup loading configuration
Load NacosConfigBootstrapConfiguration through META-INF/spring.factories
org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration
Initialize NacosConfigProperties and NacosPropertySourceLocator
@Configuration @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true) public class NacosConfigBootstrapConfiguration { @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties() { return new NacosConfigProperties(); } @Bean public NacosPropertySourceLocator nacosPropertySourceLocator( NacosConfigProperties nacosConfigProperties) { return new NacosPropertySourceLocator(nacosConfigProperties); } }
Load configuration to org.springframework.core.env.Environment via com.alibaba.cloud.nacos.client.NacosPropertySourceLocator
@Override public PropertySource<?> locate(Environment env) { ConfigService configService = nacosConfigProperties.configServiceInstance(); if (null == configService) { log.warn("no instance of config service found, can't load config from nacos"); return null; } long timeout = nacosConfigProperties.getTimeout(); nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout); String name = nacosConfigProperties. getName(); String dataIdPrefix = nacosConfigProperties.getPrefix(); if (StringUtils. isEmpty(dataIdPrefix)) { dataIdPrefix = name; } if (StringUtils. isEmpty(dataIdPrefix)) { dataIdPrefix = env. getProperty("spring. application. name"); } CompositePropertySource composite = new CompositePropertySource( NACOS_PROPERTY_SOURCE_NAME); loadSharedConfiguration(composite); loadExtConfiguration(composite); loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env); return composite; }
Nacos start registration com.alibaba.nacos.api.config.listener.Listener
Load NacosConfigAutoConfiguration from META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration
Initialize com.alibaba.cloud.nacos.refresh.NacosContextRefresher to implement start registration Listener
@Override public void onApplicationEvent(ApplicationReadyEvent event) { // many Spring contexts if (this. ready. compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } } private void registerNacosListenersForApplications() { if (refreshProperties. isEnabled()) { for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository .getAll()) { if (!nacosPropertySource.isRefreshable()) { continue; } String dataId = nacosPropertySource.getDataId(); registerNacosListener(nacosPropertySource.getGroup(), dataId); } } } private void registerNacosListener(final String group, final String dataId) { Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() { @Override public void receiveConfigInfo(String configInfo) { refreshCountIncrement(); String md5 = ""; if (!StringUtils. isEmpty(configInfo)) { try { MessageDigest md = MessageDigest. getInstance("MD5"); md5 = new BigInteger(1, md. digest(configInfo. getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log. isDebugEnabled()) { log.debug("Refresh Nacos config group " + group + ",dataId" + dataId); } } @Override public Executor getExecutor() { return null; } }); try { configService.addListener(dataId, group, listener); } catch (NacosException e) { e.printStackTrace(); } }
Publish the update event through applicationContext.publishEvent(new RefreshEvent(this, null, “Refresh Nacos config”)); and trigger the listener operation org.springframework.cloud.endpoint.event.RefreshEventListener
org.springframework.cloud.endpoint.event.RefreshEventListener @Override public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { return ApplicationReadyEvent.class.isAssignableFrom(eventType) || RefreshEvent.class.isAssignableFrom(eventType); } @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ApplicationReadyEvent) { handle((ApplicationReadyEvent) event); } else if (event instanceof RefreshEvent) { handle((RefreshEvent) event); } } public void handle(ApplicationReadyEvent event) { this.ready.compareAndSet(false, true); } public void handle(RefreshEvent event) { if (this.ready.get()) { // don't handle events before app is ready log.debug("Event received " + event.getEventDesc()); Set<String> keys = this. refresh. refresh(); log.info("Refresh keys changed: " + keys); } }
The listener triggers a call to org.springframework.cloud.context.refresh.ContextRefresher.refresh()
public synchronized Set<String> refresh() { Set<String> keys = refreshEnvironment(); this.scope.refreshAll(); return keys; } public synchronized Set<String> refreshEnvironment() { Map<String, Object> before = extract( this.context.getEnvironment().getPropertySources()); addConfigFilesToEnvironment(); Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet(); this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys)); return keys; } Set<String> keys = refreshEnvironment(); 1. Realize the refresh of Environment 2. Get the key whose configuration changes 3. Release of the EnvironmentChangeEvent event, trigger monitoring org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder Realize the refresh of org.springframework.boot.context.properties.ConfigurationProperties annotation this.scope.refreshAll(); Refresh beans managed by org.springframework.cloud.context.scope.refresh.RefreshScope
Nacos implements monitoring
Load NacosConfigAutoConfiguration from META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration
Obtain com.alibaba.nacos.api.config.ConfigService according to the configuration
Set com.alibaba.nacos.client.config.impl.ClientWorker
1. Get ConfigService com.alibaba.cloud.nacos.NacosConfigAutoConfiguration.nacosContextRefresher(NacosConfigProperties, NacosRefreshProperties, NacosRefreshHistory) @Bean public NacosContextRefresher nacosContextRefresher( NacosConfigProperties nacosConfigProperties, NacosRefreshProperties nacosRefreshProperties, NacosRefreshHistory refreshHistory) { return new NacosContextRefresher(nacosRefreshProperties, refreshHistory, nacosConfigProperties.configServiceInstance()); } 2. Get ConfigService com.alibaba.cloud.nacos.NacosConfigProperties.configServiceInstance() 3. Factory method to get ConfigService com.alibaba.nacos.api.NacosFactory.createConfigService(Properties) public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor. newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } 4. Generate NacosConfigService by reflection com.alibaba.nacos.client.config.NacosConfigService.NacosConfigService(Properties) public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties. getProperty(PropertyKeyConst. ENCODE); if (StringUtils. isBlank(encodeTmp)) { encode = Constants. ENCODE; } else { encode = encodeTmp.trim(); } initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); worker = new ClientWorker(agent, configFilterChainManager, properties); } 5. Instantiate the ClientWorker com.alibaba.nacos.client.config.impl.ClientWorker.ClientWorker(HttpAgent, ConfigFilterChainManager, Properties)
ClientWorker realizes timing detection every 10ms
@SuppressWarnings("PMD.ThreadPoolCreationRule") public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this. agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); executor = Executors. newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t. setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t. setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER. error("[" + agent. getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit. MILLISECONDS); } The executor runs com.alibaba.nacos.client.config.impl.ClientWorker.checkConfigInfo() regularly Execute once in 10ms per thread executorService runs LongPollingRunnable
LongPollingRunnable implements change notifications for long-running training
com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable @Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config for (CacheData cacheData : cacheMap. get(). values()) { if (cacheData. getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData. isUseLocalConfigInfo()) { cacheData. checkListenerMd5(); } } catch (Exception e) { LOGGER. error("get local config info error", e); } } } // check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey. parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key. length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String. format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER. error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData. checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList. clear(); executorService. execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER. error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit. MILLISECONDS); } } }
## com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable 1. Loop through CacheData 2. Detect local configuration com.alibaba.nacos.client.config.impl.ClientWorker.checkLocalConfig(CacheData) 3. If it is a local configuration, perform md5 comparison if (cacheData. isUseLocalConfigInfo()) { cacheData. checkListenerMd5(); } ## com.alibaba.nacos.client.config.impl.CacheData 1. Perform md5 comparison com.alibaba.nacos.client.config.impl.CacheData.checkListenerMd5() 2. If md5 changes, execute the callback com.alibaba.nacos.client.config.impl.CacheData.safeNotifyListener(String, String, String, String, ManagerListenerWrap) 3. Execute the listener callback: there is a thread pool, the thread pool executes, and there is no direct operation private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap. listener; Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread. currentThread(). getContextClassLoader(); ClassLoader appClassLoader = listener. getClass(). getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); } // Before executing the callback, set the thread classloader to the classloader of the specific webapp, so as to avoid abnormal or misuse of calling the spi interface in the callback method (multi-application deployment will only have this problem). Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr); String contentTmp = cr. getContent(); listener.receiveConfigInfo(contentTmp); listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException de) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, de.getErrCode(), de.getErrMsg()); } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t. getCause()); } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System. currentTimeMillis(); try { if (null != listener. getExecutor()) { listener.getExecutor().execute(job); } else { job. run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t. getCause()); } final long finishNotify = System. currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }
Com.alibaba.nacos.api.config.listener.Listener triggers receiveConfigInfo to publish refresh events and complete the triggering of RefreshEvent