Java-Concurrency-21-Eight ways to achieve asynchrony

In actual development, many business scenarios need to use asynchrony, so the following eight common asynchronous methods are listed below

Asynchronous mode 1: Thread

public class AsyncThread extends Thread {<!-- -->

    @Override
    public void run() {<!-- -->
        try {<!-- -->
            Thread. sleep(3000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }
        System.out.println("3333333333333333333");
    }

    public static void main(String[] args) {<!-- -->
        System.out.println("111111111111111");
        AsyncThread asyncThread = new AsyncThread();
        asyncThread. start();
        System.out.println("2222222222222222");
    }
}


Of course, if a Thread thread is created every time, frequently created and destroyed, and system resources are wasted, we can use the thread pool:
Business logic can be encapsulated into Runnable or Callable, and executed by the thread pool.

Asynchronous method 2: Future asynchronous

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@SuppressWarnings("ALL")
public class FutureManager {<!-- -->

    public String execute() throws Exception {<!-- -->
        ExecutorService executor = Executors. newFixedThreadPool(1);
        Future<String> future = executor. submit(() -> {<!-- -->
            System.out.println(" --- task start --- ");
            Thread. sleep(3000);
            System.out.println("333333333333333333333");
            System.out.println(" --- task finish ---");
            return "this is future execute final result!!!";
        });

        //The main thread will be blocked when the return value is needed here
        return future. get();
    }

    @SneakyThrows
    public static void main(String[] args) {<!-- -->
        System.out.println("1111111111111111111111");
        FutureManager manager = new FutureManager();
        manager. execute();
        System.out.println("22222222222222222222");
    }
}

The shortcomings of Future include the following points:

1 Unable to passively receive the calculation results of asynchronous tasks: Although we can actively submit asynchronous tasks to the threads in the thread pool for execution, after the execution of the asynchronous task is completed, the main thread cannot be notified whether the task is completed or not. It needs Actively obtain the result of task execution through the get method.
2 Future components are isolated from each other: sometimes after a long-time-consuming asynchronous task is executed, you want to use the result returned by it to perform further calculations. This calculation will also be an asynchronous task. The relationship between the two Program developers are required to manually assign bindings. Futures cannot form a task flow (pipeline). Each Future is isolated from each other, so there is the following CompletableFuture. CompletableFuture can combine multiple Futures are connected in series to form a task flow.
3 Futrue does not have a good error handling mechanism: so far, if an asynchronous task has an exception during execution, the caller cannot passively perceive it, and must know whether the asynchronous task execution occurs by catching the exception of the get method An error has been identified, and further judgment is being made.

Asynchronous mode 3: CompletableFuture

import lombok. SneakyThrows;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCompose {<!-- -->

    /**
     * thenAccept subtask and parent task share the same thread
     */
    @SneakyThrows
    public static void thenRunAsync() {<!-- -->
        CompletableFuture<Integer> cf1 = CompletableFuture. supplyAsync(() -> {<!-- -->
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {<!-- -->
            System.out.println(Thread.currentThread() + "cf2 do something...");
        });
        //Wait for task 1 to complete
        System.out.println("cf1 result->" + cf1.get());
        //Wait for task 2 to complete
        System.out.println("cf2 result->" + cf2.get());
    }

    public static void main(String[] args) {<!-- -->
        thenRunAsync();
    }
}

We don’t need to explicitly use ExecutorService. CompletableFuture internally uses ForkJoinPool to handle asynchronous tasks. If we want to customize our own asynchronous thread pool in some business scenarios, it is also possible.

Asynchronous mode 4: Spring’s @Async asynchronous

  • Custom asynchronous thread pool
/**
 * Thread pool parameter configuration, multiple thread pools to achieve thread pool isolation, @Async annotation, the system custom thread pool is used by default, multiple thread pools can be set in the project, and when calling asynchronously, specify the name of the thread pool to be called , for example: @Async("taskName")
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * Custom thread pool
     *
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {<!-- -->
        //Returns the number of Java virtual machines with available processors 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("The maximum number of threads in the system: " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //Core thread pool size
        executor.setCorePoolSize(16);
        //Maximum number of threads
        executor.setMaxPoolSize(20);
        //Configure the queue capacity, the default value is Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //active time
        executor.setKeepAliveSeconds(60);
        // thread name prefix
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        // Set the maximum number of seconds this executor should block on shutdown to wait for remaining tasks to complete their execution before the rest of the container continues to shutdown
        executor.setAwaitTerminationSeconds(60);
        //Wait for all tasks to end before closing the thread pool
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}
  • Where an asynchronous thread pool needs to be used
public interface AsyncService {<!-- -->

    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

    MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {<!-- -->

    @Autowired
    private IMessageHandler messageHandler;

    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {<!-- -->
        try {<!-- -->

            Thread. sleep(1000);
            messageHandler.sendSms(callPrefix, mobile, actionType, content);

        } catch (Exception e) {<!-- -->
            log.error("Exception sending SMS -> ", e)
        }
    }
    
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {<!-- -->
        try {<!-- -->

            Thread. sleep(1000);
            messageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {<!-- -->
            log.error("Send email exception -> ", e)
        }
    }
}

In actual projects, using @Async to call the thread pool, the recommended way is to use the custom thread pool mode, and it is not recommended to directly use @Async to directly implement asynchrony.

Asynchronous mode 5: Spring ApplicationEvent event is asynchronous

define event

import lombok. Getter;
import lombok. Setter;
import org.springframework.context.ApplicationEvent;

public class AsyncSendEmailEvent extends ApplicationEvent {<!-- -->

    public AsyncSendEmailEvent(Object source) {<!-- -->
        super(source);
    }


    /**
     * Mail
     **/
    @Getter
    @Setter
    private String email;

    /**
     * theme
     **/
    @Getter
    @Setter
    private String subject;

    /**
     * content
     **/
    @Getter
    @Setter
    private String content;

    /**
     * receiver
     **/
    @Getter
    @Setter
    private String targetUserId;
    
    
}

Define asynchronous processing

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {<!-- -->

    @Autowired
    private EmailService emailService;
    
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {<!-- -->
        if (event == null) {<!-- -->
            return;
        }

        String email = event. getEmail();
        String subject = event. getSubject();
        String content = event. getContent();
        String targetUserId = event. getTargetUserId();
        //Process asynchronous business
        emailService.sendsendEmailSms(email, subject, content, targetUserId);
      }
}

In addition, ApplicationEvent may sometimes be used to implement asynchronous use. When an abnormal error occurs in the program, a compensation mechanism needs to be considered. At this time, Spring Retry can be used to retry to help us avoid data inconsistency caused by such an exception.

Asynchronous method 6: message queue

producer of callback messages

@Slf4j
@Component
public class CallbackProducer {<!-- -->

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {<!-- -->

        log.info("The producer sends a message, callbackDTO, {}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {<!-- -->
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {<!-- -->
                //Set the delay millisecond value for the message, and set the delay time for the message to be sent from the switch to the queue by setting the x-delay header to the message
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

consumer of callback messages

@Slf4j
@Component
@RabbitListener(queues = "message. callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {<!-- -->

    @Autowired
    private IGlobalUserService globalUserService;

    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {<!-- -->

        if (map.get("error") != null) {<!-- -->
            // deny message
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }

        try {<!-- -->
        
            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //execute business logic
            globalUserService. execute(callbackDTO);
            //The message message is successfully confirmed manually, corresponding to the message confirmation mode acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {<!-- -->
            log.error("callback failed -> {}", e);
        }
    }
}

Asynchronous mode 7: ThreadUtil asynchronous tool class

import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ThreadLocalRandom;

@Slf4j
public class ThreadUtils {<!-- -->

    public static void main(String[] args) {<!-- -->
        for (int i = 0; i < 3; i ++ ) {<!-- -->
            ThreadUtil.execAsync(() -> {<!-- -->
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom. current();
                int number = threadLocalRandom. nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("Current No.:" + i + "threads");
        }

        log.info("task finish!");
    }
}

Asynchronous mode 8: Guava asynchronous

Guava’s ListenableFuture, as the name suggests, is a Future that can be listened to, and it is an extension and enhancement of Java’s native Future. We know that Future represents an asynchronous calculation task, and the calculation result can be obtained when the task is completed. If we want to display the results to the user or do other calculations once the calculation is completed, we must use another thread to continuously query the calculation status. In doing so, the code is complex and inefficient. Using “Guava ListenableFuture” can help us detect whether the Future is completed. We don’t need to wait for the asynchronous calculation result through the get() method. If it is completed, the callback function will be called automatically, which can reduce the complexity of concurrent programs.

public static void main(String[] args) {<!-- -->
        //First initialize a ListeningExecutorService method through the static method listeningDecorator method of the MoreExecutors class,
        // Then use the submit method of this instance to initialize the ListenableFuture object.
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService. submit(new Callable<Integer>() {<!-- -->
            @Override
            public Integer call() throws Exception {<!-- -->
                log.info("callable execute...");
                TimeUnit. SECONDS. sleep(1);
                return 1;
            }
        });
        //The work to be done by ListenableFuture is defined in the implementation class of the Callable interface,
        // Here it just sleeps for 1 second and then returns a number 1, with a ListenableFuture instance,
        // You can execute this Future and execute the callback function after the Future is completed.
        Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {<!-- -->
            @Override
            public void onSuccess(Integer result) {<!-- -->
                //Successful execution...
                System.out.println("Get listenable future's result with callback " + result);
            }

            @Override
            public void onFailure(Throwable t) {<!-- -->
                // exception handling...
                t. printStackTrace();
            }
        },Executors. newSingleThreadExecutor());
    }