Spring transaction event control solves business asynchronous operation decoupling TransactionSynchronizationManager Transaction

Scene

In business, it is often necessary to send messages or events to asynchronously call other components to perform corresponding business operations after performing database operations (transaction submission is completed).
For example: After the user registers successfully, the activation code or activation email is sent. If the user saves and executes an asynchronous operation to send the activation code or activation email, but an exception occurs after the previous user saves, the database rolls back, and the user In fact, the registration is not successful, but the user receives an activation code or an activation email. At this point, we urgently require the database transaction to complete before performing asynchronous operations.

Requirements

After successfully modifying the database, send a message to Rabbitmq

Solution

1. Use TransactionSynchronizationManage to control transactions


TransactionSynchronization There are methods in this class to control the execution of the business after the transaction is submitted.

service code

 public boolean updateById(Hotel hotel) {<!-- -->
        logger.info("----- into service -----");

        //Modify DB data
        hotelMapper. updateById(hotel);

        // Execute after the transaction is submitted
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {<!-- -->
            @Override
            public void afterCommit() {<!-- -->
                // After successfully modifying the DB data, send a message to Rabbitmq
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString());
                logger.info("----- rabbitmq send message -----");

            }
        });

        logger.info("--------- out service ----------");
        //simulate exception
        int n = 0/0;

        return false;
    }

Note: The above code will be executed after the transaction is committed. If it is in a non-transactional context, java.lang.IllegalStateException: Transaction synchronization is not active will be thrown.

So the @Transactional annotation must be added to the service method.

2. Use TransactionSynchronizationAdapter to control transactions

Configuration class

import com.sun.istack.internal.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@Component("afterCommitExecutor")
public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {<!-- -->
    private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
    private ThreadPoolExecutor threadPool;

    private Logger logger = LoggerFactory. getLogger(AfterCommitExecutor. class);
    
    @PostConstruct
    public void init() {<!-- -->
        logger.debug("Initialize thread pool...");
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        if (0 >= availableProcessors) {<!-- -->
            availableProcessors = 1;
        }
        int maxPoolSize = (availableProcessors > 5) ? availableProcessors * 2 : 5;
        logger.debug("CPU Processors :%s MaxPoolSize:%s", availableProcessors, maxPoolSize);
        threadPool = new ThreadPoolExecutor(
            availableProcessors,
            maxPoolSize,
            60,
            TimeUnit. SECONDS,
            new LinkedBlockingQueue<Runnable>(maxPoolSize * 2),
            Executors. defaultThreadFactory(),
            new RejectedExecutionHandler() {<!-- -->
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
                    logger.debug("Task:%s rejected", r.toString());
                    if (!executor.isShutdown()) {<!-- -->
                        executor.getQueue().poll();
                        executor. execute(r);
                    }
                }
            }
        );
    }

    @PreDestroy
    public void destroy() {<!-- -->
        logger.debug("Destroy thread pool...");
        if (null != threadPool & amp; & amp; !threadPool.isShutdown()) {<!-- -->
            threadPool. shutdown();
        }
    }

    @Override
    public void execute(@NotNull Runnable runnable) {<!-- -->
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {<!-- -->
            runnable. run();
            return;
        }
        List<Runnable> threadRunnables = RUNNABLES. get();
        if (threadRunnables == null) {<!-- -->
            threadRunnables = new ArrayList<Runnable>();
            RUNNABLES.set(threadRunnables);
            TransactionSynchronizationManager. registerSynchronization(this);
        }
        threadRunnables. add(runnable);
    }

    @Override
    public void afterCommit() {<!-- -->
        logger.debug("Transaction submission completed processing ... ");
        List<Runnable> threadRunnables = RUNNABLES. get();
        for (int i = 0; i <threadRunnables. size(); i ++ ) {<!-- -->
            Runnable runnable = threadRunnables. get(i);
            try {<!-- -->
                threadPool. execute(runnable);
            } catch (RuntimeException e) {<!-- -->
                logger. error("", e);
            }
        }
    }

    @Override
    public void afterCompletion(int status) {<!-- -->
        logger.debug("Transaction completed .... ");
        RUNNABLES. remove();
    }
}

service code

 @Transactional
    public boolean updateById(Hotel hotel) {<!-- -->
        logger.info("----- into service -----");

        //Modify DB data
        hotelMapper. updateById(hotel);

        // use AfterCommitExecutor
        afterCommitExecutor. execute(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString());
                logger.info("----- rabbitmq send message -----");
            }
        });

        logger.info("--------- out service ----------");
        int n = 0/0;

        return false;
    }