Scene
In business, it is often necessary to send messages or events to asynchronously call other components to perform corresponding business operations after performing database operations (transaction submission is completed).
For example: After the user registers successfully, the activation code or activation email is sent. If the user saves and executes an asynchronous operation to send the activation code or activation email, but an exception occurs after the previous user saves, the database rolls back, and the user In fact, the registration is not successful, but the user receives an activation code or an activation email. At this point, we urgently require the database transaction to complete before performing asynchronous operations.
Requirements
After successfully modifying the database, send a message to Rabbitmq
Solution
1. Use TransactionSynchronizationManage to control transactions
TransactionSynchronization There are methods in this class to control the execution of the business after the transaction is submitted.
service code
public boolean updateById(Hotel hotel) {<!-- --> logger.info("----- into service -----"); //Modify DB data hotelMapper. updateById(hotel); // Execute after the transaction is submitted TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {<!-- --> @Override public void afterCommit() {<!-- --> // After successfully modifying the DB data, send a message to Rabbitmq rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString()); logger.info("----- rabbitmq send message -----"); } }); logger.info("--------- out service ----------"); //simulate exception int n = 0/0; return false; }
Note: The above code will be executed after the transaction is committed. If it is in a non-transactional context, java.lang.IllegalStateException: Transaction synchronization is not active will be thrown.
So the @Transactional annotation must be added to the service method.
2. Use TransactionSynchronizationAdapter to control transactions
Configuration class
import com.sun.istack.internal.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; @Component("afterCommitExecutor") public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {<!-- --> private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>(); private ThreadPoolExecutor threadPool; private Logger logger = LoggerFactory. getLogger(AfterCommitExecutor. class); @PostConstruct public void init() {<!-- --> logger.debug("Initialize thread pool..."); int availableProcessors = Runtime.getRuntime().availableProcessors(); if (0 >= availableProcessors) {<!-- --> availableProcessors = 1; } int maxPoolSize = (availableProcessors > 5) ? availableProcessors * 2 : 5; logger.debug("CPU Processors :%s MaxPoolSize:%s", availableProcessors, maxPoolSize); threadPool = new ThreadPoolExecutor( availableProcessors, maxPoolSize, 60, TimeUnit. SECONDS, new LinkedBlockingQueue<Runnable>(maxPoolSize * 2), Executors. defaultThreadFactory(), new RejectedExecutionHandler() {<!-- --> @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- --> logger.debug("Task:%s rejected", r.toString()); if (!executor.isShutdown()) {<!-- --> executor.getQueue().poll(); executor. execute(r); } } } ); } @PreDestroy public void destroy() {<!-- --> logger.debug("Destroy thread pool..."); if (null != threadPool & amp; & amp; !threadPool.isShutdown()) {<!-- --> threadPool. shutdown(); } } @Override public void execute(@NotNull Runnable runnable) {<!-- --> if (!TransactionSynchronizationManager.isSynchronizationActive()) {<!-- --> runnable. run(); return; } List<Runnable> threadRunnables = RUNNABLES. get(); if (threadRunnables == null) {<!-- --> threadRunnables = new ArrayList<Runnable>(); RUNNABLES.set(threadRunnables); TransactionSynchronizationManager. registerSynchronization(this); } threadRunnables. add(runnable); } @Override public void afterCommit() {<!-- --> logger.debug("Transaction submission completed processing ... "); List<Runnable> threadRunnables = RUNNABLES. get(); for (int i = 0; i <threadRunnables. size(); i ++ ) {<!-- --> Runnable runnable = threadRunnables. get(i); try {<!-- --> threadPool. execute(runnable); } catch (RuntimeException e) {<!-- --> logger. error("", e); } } } @Override public void afterCompletion(int status) {<!-- --> logger.debug("Transaction completed .... "); RUNNABLES. remove(); } }
service code
@Transactional public boolean updateById(Hotel hotel) {<!-- --> logger.info("----- into service -----"); //Modify DB data hotelMapper. updateById(hotel); // use AfterCommitExecutor afterCommitExecutor. execute(new Runnable() {<!-- --> @Override public void run() {<!-- --> rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString()); logger.info("----- rabbitmq send message -----"); } }); logger.info("--------- out service ----------"); int n = 0/0; return false; }