ByteD2: A thread in the thread pool throws an exception, how to deal with it?

Everyone should be familiar with exception handling. But we need to run some transactions in the thread pool. How should this exception handling be implemented?

1. Simulate thread pool throwing exception

In actual development, we often use thread pools, but once a task is submitted to the thread pool, what should we do if an exception occurs? How to obtain exception information? Before understanding this problem, you can first take a look at the source code analysis of the thread pool. From the link, we know the submission method of the thread pool: the difference between submit and execute. Next, use them to execute tasks with exceptions! See what the result is!

Let’s first use pseudo code to simulate the scenario where the thread pool throws an exception:

public class ThreadPoolException {
    public static void main(String[] args) {

        //Create a thread pool
        ExecutorService executorService= Executors.newFixedThreadPool(1);

        //When the thread pool throws an exception, submit is silent and other threads continue to execute.
        executorService.submit(new task());

        //When the thread pool throws an exception, execute throws an exception and other threads continue to execute new tasks.
        executorService.execute(new task());
    }
}

//task class
class task implements Runnable{

    @Override
    public void run() {
        System.out.println("Entered task method!!!");
        int i=1/0;

    }
}

operation result:

affff68e023e66af02993f0b03e588f0.png

You can see: Submit does not print exception information, but execute does! . The submit method does not print exception information. Obviously, it is not feasible in production because we cannot guarantee that the tasks in the thread will never be abnormal. If an exception occurs using the submit method, we can write it directly as above. It will be impossible to obtain the exception information and make corresponding judgment and processing, so the next step is to know how to obtain the exception thrown by the thread pool!

If submit() wants to obtain exception information, it must use the get() method! !

//When the thread pool throws an exception, submit is silent and other threads continue to execute.
        Future<?> submit = executorService.submit(new task());
        submit.get();

2. How to obtain and handle exceptions

Option 1: Use try -catch
public class ThreadPoolException {
    public static void main(String[] args) {
        
        //Create a thread pool
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        //When the thread pool throws an exception, submit is silent and other threads continue to execute.
        executorService.submit(new task());

        //When the thread pool throws an exception, execute throws an exception and other threads continue to execute new tasks.
        executorService.execute(new task());
    }
}
//Task class
class task implements Runnable {
    @Override
    public void run() {
        try {
            System.out.println("Entered task method!!!");
            int i = 1 / 0;
        } catch (Exception e) {
            System.out.println("Use try -catch to catch exception" + e);
        }
    }
}

Print the result:

045474a8f4e6439d253b5448e47c7c2b.png

You can see that both submit and execute have captured exceptions clearly and understandably, and you can know that there is a problem with our task instead of disappearing without a trace.

Option 2: Use the Thread.setDefaultUncaughtExceptionHandler method to catch exceptions

In the first option, it is too troublesome to add a try-catch for each task, and the code is not good-looking. If you think about it this way, you can use the Thread.setDefaultUncaughtExceptionHandler method to catch exceptions

7f1a21515ecf53492c1b68918184207f.png

UncaughtExceptionHandler is an internal class of Thread class and a functional interface. The internal uncaughtException is a method for handling exceptions that occur within a thread. The parameters are the thread object t and the exception object e.

fdbd1eb666faa5fafa32055aa7c3d5ff.png

The application in the thread pool is as follows: rewrite its thread factory method, and assign the UncaughtExceptionHandler handler object when the thread factory creates a thread.

public class ThreadPoolException { public static void main(String[] args) throws InterruptedException {

//1. Implement your own thread pool factory
        ThreadFactory factory = (Runnable r) -> {
            //Create a thread
            Thread t = new Thread(r);
            //Set the UncaughtExceptionHandler object for the created thread to implement the default logic of the exception.
            t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
                System.out.println("exceptionHandler set by the thread factory" + e.getMessage());
            });
            return t;
        };

        //2. Create a self-defined thread pool and use a self-defined thread factory
        ExecutorService executorService = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue(10),
                factory);

        //submit without prompt
        executorService.submit(new task());

        Thread.sleep(1000);
        System.out.println("================== To verify the print results, execute the execute method after 1 second");

        // The execute method catches the exception by the UncaughtExceptionHandler of the thread factory.
        executorService.execute(new task());


    }


}

class task implements Runnable {
    @Override
    public void run() {
        System.out.println("Entered task method!!!");
        int i = 1 / 0;
    }
}

The print result is as follows:

4996d30a90779ebc0578651b6db30fab.png

According to the printing results, we can see that the execute method caught the exception by the UncaughtExceptionHandler set in the thread factory, but the submit method did not respond! It shows that UncaughtExceptionHandler is not called in submit. Why is this?

In daily use, we know that the biggest difference between execute and submit is that execute has no return value, while submit has a return value. Submit returns a future, through which the thread execution results or exception information can be obtained.

Future<?> submit = executorService.submit(new task());
 //Print abnormal results
    System.out.println(submit.get());

d997360deb8e31a4427f8cd0b26f454c.pngFrom the results, it can be seen that the submit is not lost Exception , there is still an exception printed when using future.get()! ! So why doesn’t the UncaughtExceptionHandler of the thread factory print an exception? The guess is that the exception has been caught inside the submit method, but it has not been printed out. Because the exception has been caught, the jvm will not call Thread’s UncaughtExceptionHandler to handle the exception.

Next, verify the conjecture:

First, take a look at the source code of submit and execute:

  • The source code of the execute method was written in detail in my last blog, so I won’t go into details here.

  • The submit source code still calls the execute method at the bottom, but it has an extra layer of Future encapsulation and returns this Future. This also explains why submit has a return value.

//submit() method
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        
        //execute internally executes the internal logic of this object, and then sets the result or exception into this ftask.
        RunnableFuture<T> ftask = newTaskFor(task);
        //Execute the execute method
        execute(ftask);
        //Return this ftask
        return ftask;
    }

You can see that submit is also called execute. In the execute method, our task is submitted to addWorker(command, true), and then a Worker is created for each task to process this thread. This Worker is also a thread and is called when executing the task. is the run method of Worker! The runworker method is called inside the run method! As follows:

public void run() {
            runWorker(this);
     }
     
  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
         //Here is the reason why threads can be reused, loop + conditional judgment, constantly fetching tasks from the queue
         //Another question is how to solve the timeout deletion of non-core threads
         //Mainly the getTask method() see below ③
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() & amp; & amp;
                      runStateAtLeast(ctl.get(), STOP))) & amp; & amp;
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                     //Execution thread
                        task.run();
                        //Exception handling
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                     //Execute method can override this method to handle exceptions
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks + + ;
                    w.unlock();
                }
            }
            //completeAbruptly will not be modified to false when an exception occurs
            completedAbruptly = false;
        } finally {
         //If the completedAbruptly value is true and an exception occurs, add a new Worker to process the subsequent thread.
            processWorkerExit(w, completedAbruptly);
        }
    }

The core is in the task.run(); method. If an exception occurs during this period, it will be thrown.

  • If a task is submitted using execute, it will be encapsulated into a runable task, and then encapsulated into a worker. Finally, the runWoker method is called in the worker’s run method. The task is executed in the runWoker method. If an exception occurs in the task, use try -catch catches the exception and throws it outside. We use try-catch in the outermost layer to catch the exception thrown in the runWoker method. So we saw the exception information of our task in execute.

  • So why does submit have no exception information? Because submit encapsulates the task into a futureTask, and then the futureTask is encapsulated into a worker. In the waker’s run method, the run method of the futureTask is ultimately called. It is guessed that the exception is swallowed directly and no exception is thrown. Therefore, the exception cannot be caught in the worker’s runWorker method.

Let’s take a look at the run method of futureTask. As expected, the exception is swallowed in try-catch and placed in setException(ex);

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null & amp; & amp; state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //Exception information is set in this method
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
            //Omit the following
   . . . . . . 

The setException(ex) method is as follows: assign the exception object to outcome

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
         //Assign the exception object to outcome and remember this outcome.
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

What is the use of assigning the exception object to outcome? What is this outcome? When we use submit to return a Future object and use Future.get(), the internal report method will be called!

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //Pay attention to this method
        return report(s);
    }

The report actually returns outcome, and the previous exception was set into this outcome.

private V report(int s) throws ExecutionException {
     //Set `outcome`
        Object x = outcome;
        if (s == NORMAL)
         //Return `outcome`
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

Therefore, when submitting with submit, the runable object is encapsulated into a future. When the run method in the future handles exceptions, try-catch all the exceptions and set them to the variable outcome through the setException(ex); method. You can pass future.get gets the outcome.

Therefore, when submitting, if an exception occurs, no information will be thrown. The exception thrown by submit can be obtained through future.get()! In submit, there is no other method except getting the exception from the return result. Therefore, when there is no need to return a result, it is best to use execute, so that even if try-catch is not written and exception capture is omitted, the exception information will not be lost.

Option 3: Rewrite afterExecute for exception handling

Through the above source code analysis, in the exclude method, exception handling can be performed by rewriting afterExecute, but pay attention! This only applies to excute submission (the submit method is more troublesome, as discussed below), because the exception is swallowed in task.run of submit, and no exception will come out at all, so no exception will enter afterExecute.

In runWorker, after calling task.run, the afterExecute(task, thrown) method of the thread pool will be called.

final void runWorker(Worker w) {
//Current thread
        Thread wt = Thread.currentThread();
        //Our submitted task
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() & amp; & amp;
                      runStateAtLeast(ctl.get(), STOP))) & amp; & amp;
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //Directly call the run method of the task
                        task.run(); //If it is the run of futuretask, the exception is swallowed and no exception will be thrown.
                       // Therefore Throwable thrown = null; will not enter the catch
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    //Call the afterExecute method of the thread pool and pass in the task and exception
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks + + ;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

Execution results: We can handle exceptions inside the afterExecute method

316b3afd5a8ea85db73de35b8900d096.png

If you want to use this afterExecute to handle the exception submitted by submit, additional processing is required. Determine whether the Throwable is a FutureTask. If it is an exception submitted by submit, the code is as follows:

public class ThreadPoolException3 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {


        //1. Create a self-defined thread pool
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                3,
                0,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue(10)
        ) {
            //Rewrite the afterExecute method
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                //This is when excute is submitted
                if (t != null) {
                    System.out.println("Get the exception information submitted by excute in afterExecute and handle the exception" + t.getMessage());
                }
                //If the actual type of r is FutureTask, it is submitted by submit, so you can get the exception in it.
                if (r instanceof FutureTask) {
                    try {
                        Future<?> future = (Future<?>) r;
                        //get gets exception
                        future.get();

                    } catch (Exception e) {
                        System.out.println("obtain the exception information submitted by submit in afterExecute and handle the exception" + e);
                    }
                }
            }
        };
        //Execute when the thread pool throws an exception
        executorService.execute(new task());
        
        //When the thread pool throws an exception, submit
        executorService.submit(new task());
    }
}

class task3 implements Runnable {
    @Override
    public void run() {
        System.out.println("Entered task method!!!");
        int i = 1 / 0;
    }
}

The processing results are as follows:

39f3a229f09eb4da3b2384934ee655c7.png

You can see that by rewriting afterExecute, you can handle both the exceptions thrown by execute and the exceptions thrown by submit

If you find it helpful, you can click “Looking” or forward it

Let more friends see it ~ ?( ′?` )