Spring transaction event control solves business asynchronous operation decoupling


In business, it is often necessary to send messages or events to asynchronously call other components to perform corresponding business operations after performing database operations (transaction submission is completed).
For example: After the user registers successfully, the activation code or activation email is sent. If the user saves and executes an asynchronous operation to send the activation code or activation email, but an exception occurs after the previous user saves, the database rolls back, and the user In fact, the registration is not successful, but the user receives an activation code or an activation email. At this point, we urgently require the database transaction to complete before performing asynchronous operations.


After successfully modifying the database, send a message to Rabbitmq


1. Use TransactionSynchronizationManage to control transactions

TransactionSynchronization There are methods in this class to control the execution of the business after the transaction is submitted.

service code

 public boolean updateById(Hotel hotel) {<!-- -->
        logger.info("----- into service -----");

        //Modify DB data
        hotelMapper. updateById(hotel);

        // Execute after the transaction is submitted
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {<!-- -->
            public void afterCommit() {<!-- -->
                // After successfully modifying the DB data, send a message to Rabbitmq
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString());
                logger.info("----- rabbitmq send message -----");


        logger.info("--------- out service ----------");
        //simulate exception
        int n = 0/0;

        return false;

Note: The above code will be executed after the transaction is committed. If it is in a non-transactional context, java.lang.IllegalStateException: Transaction synchronization is not active will be thrown.

So the @Transactional annotation must be added to the service method.

2. Use TransactionSynchronizationAdapter to control transactions

Configuration class

import com.sun.istack.internal.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {<!-- -->
    private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
    private ThreadPoolExecutor threadPool;

    private Logger logger = LoggerFactory. getLogger(AfterCommitExecutor. class);
    public void init() {<!-- -->
        logger.debug("Initialize thread pool...");
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        if (0 >= availableProcessors) {<!-- -->
            availableProcessors = 1;
        int maxPoolSize = (availableProcessors > 5) ? availableProcessors * 2 : 5;
        logger.debug("CPU Processors :%s MaxPoolSize:%s", availableProcessors, maxPoolSize);
        threadPool = new ThreadPoolExecutor(
            TimeUnit. SECONDS,
            new LinkedBlockingQueue<Runnable>(maxPoolSize * 2),
            Executors. defaultThreadFactory(),
            new RejectedExecutionHandler() {<!-- -->
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
                    logger.debug("Task:%s rejected", r.toString());
                    if (!executor.isShutdown()) {<!-- -->
                        executor. execute(r);

    public void destroy() {<!-- -->
        logger.debug("Destroy thread pool...");
        if (null != threadPool & amp; & amp; !threadPool.isShutdown()) {<!-- -->
            threadPool. shutdown();

    public void execute(@NotNull Runnable runnable) {<!-- -->
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {<!-- -->
            runnable. run();
        List<Runnable> threadRunnables = RUNNABLES. get();
        if (threadRunnables == null) {<!-- -->
            threadRunnables = new ArrayList<Runnable>();
            TransactionSynchronizationManager. registerSynchronization(this);
        threadRunnables. add(runnable);

    public void afterCommit() {<!-- -->
        logger.debug("Transaction submission completed processing ... ");
        List<Runnable> threadRunnables = RUNNABLES. get();
        for (int i = 0; i <threadRunnables. size(); i ++ ) {<!-- -->
            Runnable runnable = threadRunnables. get(i);
            try {<!-- -->
                threadPool. execute(runnable);
            } catch (RuntimeException e) {<!-- -->
                logger. error("", e);

    public void afterCompletion(int status) {<!-- -->
        logger.debug("Transaction completed .... ");
        RUNNABLES. remove();

service code

    public boolean updateById(Hotel hotel) {<!-- -->
        logger.info("----- into service -----");

        //Modify DB data
        hotelMapper. updateById(hotel);

        // use AfterCommitExecutor
        afterCommitExecutor. execute(new Runnable() {<!-- -->
            public void run() {<!-- -->
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString());
                logger.info("----- rabbitmq send message -----");

        logger.info("--------- out service ----------");
        int n = 0/0;

        return false;