Use these 4 tricks to elegantly implement data transfer between Spring Boot asynchronous threads

Original snail programmer snail g 2023-02-18 12:03 Published in Jiangsu

Click on “Programmer Snail g” above and select “Set as Star”

Together with Brother Snail, make a little progress every day

programmer snail g

A programmer from a big factory, together with the snail, make a little progress every day

8 original content

No public

Hi, I’m a snail!

In actual development, some data needs to be passed between parent and child threads. For example, user login information is stored in ThreadLocal to ensure thread isolation. The code is as follows:

/**
 * @author public account: programmer snail g
 * @description user context information
 */
public class UserUtils {
    private static final ThreadLocal<String> userLocal=new ThreadLocal<>();

    public static String getUserId(){
        return userLocal. get();
    }
    public static void setUserId(String userId){
        userLocal.set(userId);
    }

    public static void clear(){
        userLocal. remove();
    }}

So how does the child thread want to get this userId?

1. Manual setting

code show as below:

public void handlerAsync() {
    //1. Get the userId of the parent thread
        String userId = UserUtils. getUserId();
        CompletableFuture. runAsync(()->{
            //2. Set the value of the child thread, reuse
            UserUtils.setUserId(userId);
            log.info("The value of the child thread: {}", UserUtils.getUserId());
        });</code><code> }

In this way, every time you open an asynchronous thread, you need to manually set it up. There are too many repeated codes, which makes you a headache!

2. Thread pool setting TaskDecorator

TaskDecorator is a decorator that executes callback methods, mainly used to pass context, or provide task monitoring/statistics.

So how do we use it? code show as below:


/**
* @author public account: programmer snail g
* @description context decorator
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
String robotId = UserUtils. getUserId();
System.out.println(robotId);
return () -> {
try {
// Set the request information of the main thread to the child thread
UserUtils.setUserId(robotId);
// Execute the child thread, don't forget this step
runnable. run();
} finally {
// The thread ends, clear the information, otherwise it may cause a memory leak
UserUtils. clear();
}
}; }
}

TaskDecorator needs to be used in conjunction with the thread pool. In actual development, it is recommended to use the thread pool for asynchronous threads. You only need to configure it in the corresponding thread pool. The code is as follows:



@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
    log.info("start asyncServiceExecutor----------------");
    //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //Use the thread pool to visualize the running status
    ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    // Configure the number of core threads
    executor.setCorePoolSize(corePoolSize);
    // Configure the maximum number of threads
    executor.setMaxPoolSize(maxPoolSize);
    //Configure the queue size
    executor.setQueueCapacity(queueCapacity);
    //Configure the thread name prefix in the thread pool
    executor.setThreadNamePrefix(namePrefix);

    // rejection-policy: How to handle new tasks when the pool has reached max size
    // CALLER_RUNS: The task is not executed in a new thread, but executed by the thread where the caller is located
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    //Add thread pool modifier class
    executor.setTaskDecorator(new CustomTaskDecorator());
    //Increase the thread pool modification class of MDC
    //executor.setTaskDecorator(new MDCTaskDecorator());
    //Perform initialization
    executor. initialize();
    log.info("end asyncServiceExecutor------------");
    return executor;
}

At this time, the business code does not need to set the value of the sub-thread, it can be used directly, the code is as follows:


public void handlerAsync() {
log.info("User information of parent thread: {}", UserUtils.getUserId());
//Executing asynchronous tasks requires a specified thread pool
CompletableFuture.runAsync(()->
 log.info("User information of sub-thread: {}", UserUtils.getUserId()),
 taskExecutor);
}

Take a look at the result, as shown below:

Here, CompletableFuture is used to execute asynchronous tasks, and it is also possible to use the annotation @Async.

Note: No matter which method is used, the thread pool needs to be specified

3. InheritableThreadLocal

Although InheritableThreadLocal can achieve multiplexing between parent and child threads, there will be multiplexing problems when used in the thread pool

This solution is also very simple to use, just replace ThreadLocal with InheritableThreadLocal directly, the code is as follows:


/**
* @author public account: programmer snail g
* @description user context information
*/

public class UserUtils {
    private static final InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

    public static String get(){
        return threadLocal. get();
    }
    public static void set(String userId){
        threadLocal.set(userId);
    }
    public static void clear(){
        threadLocal. remove();
    }
}


4. TransmittableThreadLocal

TransmittableThreadLocal is a thread variable transfer toolkit developed by Ali, which solves the problem that InheritableThreadLocal can only transfer local variables when new Thread is created and cannot be applied to thread pools. It can be used for link tracking, passing variables, etc. Let’s understand the principle below.

It is also very simple to use, add dependencies as follows:

<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>transmittable-thread-local</artifactId>
 <version>2.14.2</version>
</dependency>

The UserUtils transformation code is as follows:



/**
 * @author public account: programmer snail g
 * @description user context information
 */
public class UserUtils {
    private static final TransmittableThreadLocal<String> threadLocal =new TransmittableThreadLocal<>();

    public static String get(){
        return threadLocal. get();
    }
    public static void set(String userId){
        threadLocal.set(loginVal);
    }
    public static void clear(){
        threadLocal. remove();
    }
}

TransmittableThreadLocal principle

TransmittableThreadLocal inherits from InheritableThreadLocal, so it can pass values to child threads when creating a thread, so how to ensure that it is also effective when using the thread pool? Let’s take a look at the source code

1. Construction method

 public TransmittableThreadLocal() {
    this(false);
 }

 public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
    // Whether to ignore the null value set, the default is false
    this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
 }

2. set method


/**
 * {@inheritDoc}
 */
@Override
public final void set(T value) {
    if (!disableIgnoreNullValueSemantics & amp; & amp; value == null) {
        // may set null to remove value
        remove();
    } else {
        super.set(value);
        addThisToHolder();
    }
}

First look at the addThisToHolder method

@SuppressWarnings("unchecked")
private void addThisToHolder() {
    if (!holder. get(). containsKey(this)) {
        holder.get().put((TransmittableThreadLocal<Object>) this, null);
 // WeakHashMap supports null value.
    }
}

What is the attribute holder?





1. Variables modified by final static will only exist in one copy

2. WeakHashMap is used, weak references are used to facilitate garbage collection

3. The key is the TransmittableThreadLocal object

remove method

/**
 * {@inheritDoc}
 */
@Override
public final void remove() {
    removeThisFromHolder();
    super. remove();
}

4. get method

/**
 * {@inheritDoc}
 */
@Override
public final T get() {
    T value = super. get();
    if (disableIgnoreNullValueSemantics || value != null) addThisToHolder();
    return value;
}

5. When we use the thread pool, we need to use TtlRunnable.get(runnable) to wrap the runnable, or use TtlExecutors.getTtlExecutor(executor) to wrap the executor, so that the variable transfer of the thread pool can work, then we will continue Look at the execution flow of the source code

TtlExecutors. getTtlExecutor(executor)

public static Executor getTtlExecutor(@Nullable Executor executor) {
    if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) {
        return executor;
    }
    // Wrap the executor
    return new ExecutorTtlWrapper(executor, true);
}
ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
    this.executor = executor;
    this. idempotent = idempotent;
}
public void execute(@NonNull Runnable command) {
    // In fact, the original runnable is also packaged by TtlRunnable
    executor.execute(TtlRunnable.get(command, false, idempotent));
}

As you can see, the principles of the two methods are the same, let’s directly look at TtlRunnable.get()

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {

    if (null == runnable) return null;
    if (runnable instanceof TtlEnhanced) {
        if (idempotent) return (TtlRunnable) runnable;
        else throw new IllegalStateException("Already TtlRunnable!");
    }
    // return TtlRunnable
    return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}

Build TtlRunnable

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
      // atomic reference
      this. capturedRef = new AtomicReference<Object>(capture());
      this.runnable = runnable;
      this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
  }

capture captures the ttl of the parent thread

public static Object capture() {
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static
HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value =
 new HashMap<TransmittableThreadLocal<Object>, Object>();
    // traverse all holders
 for (TransmittableThreadLocal<Object> threadLocal : holder. get(). keySet()) {
  // copyValue actually calls the get method of TransmittableThreadLocal to obtain the variable value stored by the thread
    ttl2Value.put(threadLocal, threadLocal.copyValue());
 }
  return ttl2Value;
}

private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value =
new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry :
 threadLocalHolder. entrySet()) {
        final ThreadLocal<Object> threadLocal = entry. getKey();
        final TtlCopier<Object> copier = entry. getValue();
        threadLocal2Value. put(threadLocal, copier. copy(threadLocal. get()));
    }
    return threadLocal2Value;
}

Look at the run method of TtlRunnable

public void run() {
    // Get the Snapshot object, which stores the value of the parent thread
    final Object captured = capturedRef. get();
    if (captured == null || releaseTtlValueReferenceAfterRun & amp; & amp; !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }
    // Pass in the ttl captured by the capture method, and then replay it in the child thread, that is, call the set method of ttl,
    // This will set the value to the current thread, and finally return the ttl that existed before the child thread
    final Object backup = replay(captured);
    try {
        // Call the run of the original runnable
        runnable. run();
    } finally {
        //
        restore(backup);
    }
}

Summarize

The above lists 4 schemes. Snail recommends scheme 2 and scheme 4 here. The shortcomings of the two schemes are very obvious, and scheme 2 or scheme 4 is also adopted in actual development.


With reference to more than 50 interviews and recruitment requirements of large factories, I summarized this resume, gold, silver and four are useful!

Reply keywords in the background Notes Get the actual combat notes organized by Brother Snail Continuously update

If this article is helpful to you, welcome to like & amp; watch & amp; share , this is very important for me to continue sharing & amp; creating high-quality articles! Thank you so much!