Asynchronously process CompletableFuture data operations and use DefaultTransactionDefinition to ensure transaction consistency

package *.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public class ThreadTransationUtil {

    private final PlatformTransactionManager transactionManager;

    public ThreadTransationUtil(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }


    public void threadBlocking(CountDownLatch latch, TransactionStatus status, AtomicBoolean isException) throws InterruptedException {
        latch.countDown();
        log.info("Start hovering, remaining count: {}", latch.getCount());
        latch.await();

        if(isException.get()){
            log.info("Start rollback");
            transactionManager.rollback(status);
        }else{
            log.info("Start submission:{}",status);
            transactionManager.commit(status);
        }
    }

    public void errorRollback(CountDownLatch latch, TransactionStatus status, AtomicBoolean isException){
        isException.set(Boolean.TRUE);
        latch.countDown();
        log.info("Rollback started, program exception, counter decremented, remaining quantity: {}", latch.getCount());
        // This thread rolls back
        transactionManager.rollback(status);

    }
}

package *.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;


@Configuration
public class ThreadPoolConfigExecutor {
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int n = Runtime.getRuntime().availableProcessors();
        executor.setCorePoolSize(n);
        executor.setMaxPoolSize(4 * n);
        executor.setQueueCapacity(500);
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

}
package com.zszc.etb.open;

import com.zszc.etb.open.domain.TpOpenDecFile;
import com.zszc.etb.open.service.config.ThreadPoolConfigExecutor;
import com.zszc.etb.openeval.utils.ThreadTransationUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;


@Slf4j
@SpringBootTest
public class MyasyncTest {

    @Resource
    private PlatformTransactionManager transactionManager;
    @Resource
    private ThreadPoolConfigExecutor threadPoolConfigExecutor;

    @Test
    public void asyncTest(){
        TpOpenDecFile file =new TpOpenDecFile();
        file.setId(9822L);
        file.setSectionId(12L);
        file.setIfHide(1);
        try {
            this.saveUpateTpOpenDecFile(file);
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    // @Transactional(rollbackFor = Exception.class)

    public Boolean saveUpateTpOpenDecFile(TpOpenDecFile file) throws ExecutionException, InterruptedException {
        //Define the number of open threads
        CountDownLatch latch = new CountDownLatch(3);
        // transaction definition
        DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
        // Are there any exceptions in all threads that start transactions?
        AtomicBoolean isException = new AtomicBoolean(Boolean.FALSE);

        CompletableFuture save1=this.save(latch,definition,isException,file);
        CompletableFuture save2=this.save2(latch,definition,isException);
        CompletableFuture save3=this.save3(latch,definition,isException);

        String s= CompletableFuture.allOf(save1,save2,save3).get().toString();
        Long id=file.getId();

        return false;
    }


    private CompletableFuture<Void> save(CountDownLatch latch,DefaultTransactionDefinition definition, AtomicBoolean isException,TpOpenDecFile file){
        return CompletableFuture.runAsync(()->{
            // Get transaction status
            TransactionStatus status = transactionManager.getTransaction(definition);
            log.info("sav1===;{}",status);
            ThreadTransationUtil util=new ThreadTransationUtil(transactionManager);
            try {
               /**Do not use @Transactional*/ in the code inside the business code
                //tpOpenBidderOpService.saveTpOpenBidderOp(op);

                file.setId(90l);
                util.threadBlocking(latch,status,isException);
            } catch (Exception e) {
                util.errorRollback(latch,status,isException);
            }

        },threadPoolConfigExecutor.threadPoolTaskExecutor());
    }

    private CompletableFuture<String> save2(CountDownLatch latch,DefaultTransactionDefinition definition, AtomicBoolean isException){
        return CompletableFuture.supplyAsync(()->{
            // Get transaction status
            TransactionStatus status = transactionManager.getTransaction(definition);
            log.info("sav2===;{}",status);
            ThreadTransationUtil util=new ThreadTransationUtil(transactionManager);
            try {
                /**Do not use @Transactional*/ in the code inside the business code

                util.threadBlocking(latch,status,isException);
            } catch (Exception e) {
                util.errorRollback(latch,status,isException);
            }
            return "dsss";
        },threadPoolConfigExecutor.threadPoolTaskExecutor());
    }

    private CompletableFuture<String> save3(CountDownLatch latch,DefaultTransactionDefinition definition, AtomicBoolean isException){
        return CompletableFuture.supplyAsync(()->{
            // Get transaction status
            TransactionStatus status = transactionManager.getTransaction(definition);
            log.info("sav3===;{}",status);
            ThreadTransationUtil util=new ThreadTransationUtil(transactionManager);
            try {
                /**Do not use @Transactional*/ in the code inside the business code

                util.threadBlocking(latch,status,isException);
            } catch (Exception e) {
                util.errorRollback(latch,status,isException);
            }
            return "9202";
        },threadPoolConfigExecutor.threadPoolTaskExecutor());
    }




}

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java skill treeJava asynchronous taskFuture and CompletableFuture138723 people are learning the system