Spring asynchronous annotation @Async

Spring provides the @Async asynchronous annotation, which can be marked on a method to call the method asynchronously.

 @Target({ElementType. TYPE, ElementType. METHOD})
    @Retention(RetentionPolicy. RUNTIME)
    @Documented
    public @interface Async {
        /**
         * A qualifier value for the specified asynchronous operation(s).
         * <p>May be used to determine the target executor to be used when executing
         * the asynchronous operation(s), matching the qualifier value (or the bean
         * name) of a specific {@link java.util.concurrent.Executor Executor} or
         * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
         * bean definition.
         * <p>When specified on a class-level {@code @Async} annotation, indicates that the
         * given executor should be used for all methods within the class. Method-level use
         * of {@code Async#value} always overrides any value set at the class level.
         *
         * @since 3.1.2
         */
        String value() default "";
    }

Basic application of @Async

// Enable asynchronous support
@EnableAsync
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class IntlCommonApplication {

    public static void main(String[] args) {
        SpringApplication.run(IntlCommonApplication.class, args);
    }

}

@RestController
public class TestController {

    @Resource
    private AsyncTestService asyncTestService;

    @RequestMapping("/test")
    public String test(){
        asyncTestService. test();
        System.out.println("Is the target method finished?");
        return "OK";
    }

}

@Service
public class AsyncTestServiceImpl implements AsyncTestService {
    // mark asynchronous method
    @Async
    @Override
    public void test() {
        try {
            TimeUnit. SECONDS. sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("test:" + this.getClass() + "Thread:" + Thread.currentThread().getName());
    }
}

If it is executed synchronously, the order of console printing should be that the Service output is at the top, and the Controller output is at the bottom. The log printed on the console can prove that the asynchronous annotation has taken effect.

Analysis of @Async principle

The test() method is marked with the @Async annotation, and the method is executed asynchronously, so the method must be intercepted, and there must be a method interceptor MethodInterceptor for method interception.

@FunctionalInterface
public interface MethodInterceptor extends Interceptor {
    @Nullable
    Object invoke(@Nonnull MethodInvocation invocation) throws Throwable;
}

A method interceptor is an interface, and the interception of asynchronous methods must be an implementation class of the interface. So how to find this implementation class, we can start the analysis from the @EnableAsync annotation.

@Target(ElementType. TYPE)
@Retention(RetentionPolicy. RUNTIME)
@Documented
@Import(AsyncConfigurationSelector. class)
public @interface EnableAsync {
    /**
     * Custom asynchronous annotations, @Async and @javax.ejb.Asynchronous will be detected by default
     */
    Class<? extends Annotation> annotation() default Annotation.class;
    /**
     * Indicates whether to use subclass proxies (CGLIB) or interface-based proxies (JDK proxies)
     */
    boolean proxyTargetClass() default false;
    /**
     * Indicates which advice to use, PROXY is based on proxy, and the other is in the form of aspect weaving
     */
    AdviceMode mode() default AdviceMode. PROXY;
    /**
     * Indicates the application order of the post-processor AsyncAnnotationBeanPostProcessor
     */
    int order() default Ordered. LOWEST_PRECEDENCE;
}

The @Import annotation imports the AsyncConfigurationSelector class, which inherits from AdviceModeImportSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    /**
     * Return different configuration classes according to the mode attribute of @EnableAsync
     */
    @Override
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] { ProxyAsyncConfiguration. class. getName() };
            case ASPECTJ:
                return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
            default:
                return null;
        }
    }
}

/**
 * Get the base class of imports based on the mode attribute of the annotation
 */
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {

    public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

    protected String getAdviceModeAttributeName() {
        return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
    }

    /**
     * Get the mode attribute of the generic annotation, call the selectImports(AdviceMode adviceMode) method of the subclass to get the import configuration class
     * AnnotationMetadata importingClassMetadata is the annotation obtained on the SpringBoot startup class (I have two annotations @EnableAsync and @SpringBootApplication on my current project), and the processImports method of ConfigurationClassParser is passed in
     */
    @Override
    public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
        //Get the generic parameters of the current class (when I debug myself, it is @EnableAsync, getClass() gets the class object of AsyncConfigurationSelector)
        Class<?> annoType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
        //Get the specified current generic annotation attribute and value
        AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
        if (attributes == null) {
            throw new IllegalArgumentException(String. format(
                "@%s is not present on importing class '%s' as expected",
                annoType.getSimpleName(), importingClassMetadata.getClassName()));
        }
        //Get the mode attribute value
        AdviceMode adviceMode = attributes. getEnum(this. getAdviceModeAttributeName());
        //Call the subclass to get the import configuration class
        String[] imports = selectImports(adviceMode);
        if (imports == null) {
            throw new IllegalArgumentException(String. format("Unknown AdviceMode: '%s'", adviceMode));
        }
        return imports;
    }

    /**
     * Returns an array of fully qualified names of the import class according to the mode value
     */
    protected abstract String[] selectImports(AdviceMode adviceMode);

}

Instantiate ProxyAsyncConfiguration

@Configuration(
    proxyBeanMethods = false
)
@Role(2)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    public ProxyAsyncConfiguration() {
    }

    @Bean(
        name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
    )
    @Role(2)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        // Shengming asynchronous annotation post processor
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils. getDefaultValue(EnableAsync. class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }

        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
        return bpp;
    }
}

The ProxyAsyncConfiguration class mainly declares the bean AsyncAnnotationBeanPostProcessor, which is an asynchronous annotation post processor. After analyzing its inheritance system, it is found that it implements the BeanFactoryAware interface. When the spring container creates the bean, it will call back: void setBeanFactory(BeanFactory beanFactory) throws BeansException.

AsyncAnnotationBeanPostProcessor

The setBeanFactory method adds an enhancer AsyncAnnotationAdvisor to the container

AsyncAnnotationAdvisor

The buildAdvice enhancement method is implemented in the constructor of AsyncAnnotationAdvisor

Interceptor built by buildAdvice method

AnnotationAsyncExecutionInterceptor

Inheritance system of AnnotationAsyncExecutionInterceptor

Focus on its parent class AsyncExecutionInterceptor

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

    public AsyncExecutionInterceptor(Executor defaultExecutor) {
        super(defaultExecutor);
    }

    public AsyncExecutionInterceptor(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
        super(defaultExecutor, exceptionHandler);
    }

    /**
     * The method rewritten by MethodInterceptor, some logic is processed before and after the method call
     */
    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //Get the class object of the target object of invocation (the Class object of the object to which the called asynchronous method belongs)
        Class<?> targetClass = (invocation. getThis() != null ? AopUtils. getTargetClass(invocation. getThis()) : null);
        //Get the Method through the class object and the method of the invocation
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        //Get the thread pool instance that handles this asynchronous method through method
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        //Encapsulate the asynchronous method into a Callable object
        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation. proceed();
                    if (result instance of Future) {
                        return ((Future<?>) result). get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex. getCause(), userDeclaredMethod, invocation. getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation. getArguments());
                }
                return null;
            }
        };

        //Pass in the asynchronous task, thread pool instance, and return value type, and call the doSubmit method of the parent class's AsyncExecutionAspectSupport
        return doSubmit(task, executor, invocation. getMethod(). getReturnType());
    }

    /**
     * According to the asynchronous method, obtain the bean name of the thread pool instance that processes the asynchronous method, and then obtain the thread pool instance in the BeanFactory according to this name. This class returns null, and the subclass will rewrite this method. AnnotationAsyncExecutionInterceptor rewrites the modification method as follows Get the value of the @Async annotation
     */
    @Override
    protected String getExecutorQualifier(Method method) {
        return null;
    }

    /**
     * Call the method of the parent class to obtain the default thread pool instance. If it cannot be obtained, use the SimpleAsyncTaskExecutor instance
     * The SimpleAsyncTaskExecutor thread pool will trigger a new thread for each task and execute it asynchronously, which is equivalent to useless thread pool
     */
    @Override
    protected Executor getDefaultExecutor(BeanFactory beanFactory) {
        Executor defaultExecutor = super. getDefaultExecutor(beanFactory);
        return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }

    @Override
    public int getOrder() {
        return Ordered. HIGHEST_PRECEDENCE;
    }

}

Next, look at AsyncExecutionAspectSupport, which is the base class of the asynchronous method execution aspect. It implements the BeanFactoryWare interface and can obtain the Bean factory. The source code is as follows

public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {

    //thread pool default bean name
    public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";


    //Whether the CompletableFuture class exists, this class is introduced by java8, this field doSubmit method is useful
    private static final boolean completableFuturePresent = ClassUtils.isPresent(
            "java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader());


    protected final Log logger = LogFactory. getLog(getClass());

    //The asynchronous method and the cache of the corresponding thread pool instance, because each asynchronous method can specify a thread pool instance
    private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<Method, AsyncTaskExecutor>(16);

    //Default thread pool instance
    private volatile Executor defaultExecutor;

    // handler for uncaught exception
    private AsyncUncaughtExceptionHandler exceptionHandler;

    private BeanFactory beanFactory;

    public AsyncExecutionAspectSupport(Executor defaultExecutor) {
        this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());
    }

    public AsyncExecutionAspectSupport(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
        this.defaultExecutor = defaultExecutor;
        this. exceptionHandler = exceptionHandler;
    }

    public void setExecutor(Executor defaultExecutor) {
        this.defaultExecutor = defaultExecutor;
    }

    public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
        this. exceptionHandler = exceptionHandler;
    }

    /**
     * Rewrite the method of BeanFactoryAware interface, set the Bean factory
     */
    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        this. beanFactory = beanFactory;
    }


    /**
     * Obtain the corresponding thread pool instance according to the specified asynchronous method
     */
    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        // Get it from the cache, if it gets it, return it directly
        AsyncTaskExecutor executor = this.executors.get(method);
        if (executor == null) {
            Executor targetExecutor;
            //Get the Bean name of the thread pool instance according to the method, the value of the value attribute of @Async
            String qualifier = getExecutorQualifier(method);
            if (StringUtils. hasLength(qualifier)) {
                //The bean factory obtains the thread pool instance according to the bean name
                targetExecutor = findQualifiedExecutor(this. beanFactory, qualifier);
            }
            else {
                targetExecutor = this.defaultExecutor;
                if (targetExecutor == null) {
                    synchronized (this.executors) {
                        if (this. defaultExecutor == null) {
                            this.defaultExecutor = getDefaultExecutor(this.beanFactory);
                        }
                        targetExecutor = this.defaultExecutor;
                    }
                }
            }
            if (targetExecutor == null) {
                return null;
            }
            //If it is not a thread pool instance of AsyncListenableTaskExecutor type, construct a TaskExecutorAdapter instance. TaskExecutorAdapter is a thread pool instance with a TaskDecorator attribute, which can decorate the task to be executed. For example, when SpringSecurity performs permission management, creating an asynchronous task will lose the parent thread Permission information, you can write a class to implement the TaskDecorator interface, and set the context information to the SecurityContextHolder in the decorate method
            executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
            this.executors.put(method, executor);
        }
        return executor;
    }

    /**
     * Abstract method, get the name of the thread pool bean according to the asynchronous method
     */
    protected abstract String getExecutorQualifier(Method method);

    /**
     * Get the thread pool instance of Executor type according to the bean name
     */
    protected Executor findQualifiedExecutor(BeanFactory beanFactory, String qualifier) {
        if (beanFactory == null) {
            throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
                    " to access qualified executor '" + qualifier + "'");
        }
        return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
    }
    
    // Get the default thread pool instance
    protected Executor getDefaultExecutor(BeanFactory beanFactory) {
        if (beanFactory != null) {
            try {
                // Find a thread pool instance of TaskExecutor type
                return beanFactory. getBean(TaskExecutor. class);
            }
            catch (NoUniqueBeanDefinitionException ex) {
                logger.debug("Could not find unique TaskExecutor bean", ex);
                try {
                    //Find a thread pool instance named taskExecutor
                    return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger. isInfoEnabled()) {
                        logger.info("More than one TaskExecutor bean found within the context, and none is named" +
                                "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
                                "as an alias) in order to use it for async processing: " + ex. getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                logger.debug("Could not find default TaskExecutor bean", ex);
                try {
                    //Find a thread pool instance named taskExecutor
                    return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    logger.info("No task executor bean found for async processing: " +
                            "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
                }
                // Giving up -> either using local default executor or none at all...
            }
        }
        return null;
    }


    /**
     * Execute asynchronous tasks, the parameters are the execution logic of the asynchronous method, the thread pool instance, and the result returned by the asynchronous method
     */
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        //If it is Java8, use CompletableFuture to perform asynchronous tasks
        if (completableFuturePresent) {
            Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
            if (result != null) {
                return result;
            }
        }
        //If the return type is ListenableFuture call submitListenable
        if (ListenableFuture. class. isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
        //If the return type is another Future type, directly hand it over to the thread pool for execution
        else if (Future. class. isAssignableFrom(returnType)) {
            return executor. submit(task);
        }
        else {
            //Do not return the result directly
            executor. submit(task);
            return null;
        }
    }

    // exception handling
    protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
        //Throw an exception directly with a return value
        if (Future. class. isAssignableFrom(method. getReturnType())) {
            ReflectionUtils. rethrowException(ex);
        }
        else {
            //The exception handler handles the exception, even if there is another exception, it will not be thrown
            try {
                this.exceptionHandler.handleUncaughtException(ex, method, params);
            }
            catch (Throwable ex2) {
                logger.error("Exception handler for async method '" + method.toGenericString() +
                        "'throw unexpected exception itself", ex2);
            }
        }
    }


    /**
     * Internal class for executing asynchronous tasks under Java8
     */
    @UsesJava8
    private static class CompletableFutureDelegate {

        public static <T> Future<T> processCompletableFuture(Class<?> returnType, final Callable<T> task, Executor executor) {
            //If the return value of the asynchronous method is not CompletableFuture type, return null directly
            if (!CompletableFuture. class. isAssignableFrom(returnType)) {
                return null;
            }
            //Call the supplyAsync method of CompletableFuture to execute the task task
            return CompletableFuture. supplyAsync(new Supplier<T>() {
                @Override
                public T get() {
                    try {
                        return task. call();
                    }
                    catch (Throwable ex) {
                        throw new CompletionException(ex);
                    }
                }
            }, executor);
        }
    }

}

Summary

Through the source code of AsyncExecutionAspectSupport, we can know that the thread pool will be configured by the user first, and the default thread pool will be used if it cannot be found. Spring will first search for a bean of TaskExecutor type or an Executor type bean named taskExecutor. If present, the SimpleAsyncTaskExecutor (deprecated) executor will be used. Therefore, we can customize the thread pool, then inject it into spring, and put the bean name in the value value of the @Async annotation.

Reference article – in-depth analysis of @Async annotation principle in SpringBoot