The crash problem caused by onError/onSuccess execution of the observer task in DISPOSED state in RxJava

DoOnError is written in RxJava, but it still causes the application to crash.

      • 1. Problem background
        • 1.1 Crash stack
        • 1.2 Write demo code to reproduce the same logic
      • 2. Equivalent restoration of the problem – recurrence
        • 2.1 Code location: io.reactivex.internal.operators.single.SingleCreate.Emitter#onError
      • 3. Repair method
        • 3.1 Solution 1: Set the global errorHandler, you need this one to deal with the bottom line, but don’t abuse it.
        • 3.2 Solution 2: Determine whether the asynchronous task is in the DISPOSED state when the asynchronous task is executed
      • Fourth, reflect on the use of RxJava
        • 4.1 The task has been dispose, why still go onError/onSuccess?
        • 4.2 For tasks in the disposed state, go to onError/onSuccess and choose to throw an exception directly. Why is this design?

1. Problem background

Recently, some crash records of business code were found in the background of the crash. When locating the code call position according to the stack, it was found that the logic of an asynchronous task written in RxJava, but there was a chain call subscribe(onSuccess, onError) in subscribe. Mainly create and start a time-consuming task during Activity#onResume, and dispose the Disposable task created during onResume during onPause.

1.1 Crash stack

RxJava crash stack

1.2 Write demo code to reproduce the same logic

// Probably the code looks like this, just write a piece of code for testing, don't need to pay attention to the specific business logic.
getLifecycle().addObserver(new LifecycleEventObserver() {<!-- -->
            @Override
            public void onStateChanged(@NonNull LifecycleOwner source, @NonNull Lifecycle.Event event) {<!-- -->
                // Execute time-consuming tasks and request data during onResume
                // dispose task during onPause
                if (event == Lifecycle.Event.ON_RESUME) {<!-- -->
                    mDisposable = Single.create(new SingleOnSubscribe<String>() {<!-- -->
                                @Override
                                public void subscribe(SingleEmitter<String> emitter) throws Exception {<!-- -->
                                    if (mDisposable != null & amp; & amp; mDisposable.isDisposed()) {<!-- -->
                                        Log.d(TAG, "subscribe: disposable=" + mDisposable + ", is disposed!");
                                        return;
                                    }
                                    boolean isSuccessful = mApi. loadData() != null;
                                    if (isSuccessful) {<!-- -->
                                        emitter.onSuccess("120");
                                    } else {<!-- -->
                                        // pass the exception
                                        emitter.onError(new IllegalStateException("process data error!"));
                                    }
                                }
                            })
                            // only for test!!
                            .flatMap(new Function<String, SingleSource<Integer>>() {<!-- -->
                                @Override
                                public SingleSource<Integer> apply(String s) throws Exception {<!-- -->
                                    return Single. just(Integer. parseInt(s));
                                }
                            })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers. mainThread())
                            // There is processing for onError here.
                            // ! ! If you write doOnError(), it will still cause the outer layer to crash.
                            .subscribe(new Consumer<Integer>() {<!-- -->
                                @Override
                                public void accept(Integer integer) throws Exception {<!-- -->
                                    Log.d(TAG, "doOnSuccess accept: " + integer);
                                }
                            }, new Consumer<Throwable>() {<!-- -->
                                @Override
                                public void accept(Throwable throwable) throws Exception {<!-- -->
                                    Log.d(TAG, "error accept: " + throwable);
                                }
                            });

                } else if (event == Lifecycle.Event.ON_PAUSE) {<!-- -->
                    if (mDisposable != null & amp; & amp; !mDisposable.isDisposed()) {<!-- -->
                        mDisposable.dispose();
                    }
                }
            }
        });

I wrote the above prototype code according to the actual business logic scenario, and switched between onResume and onPause many times, but the crash did not reproduce.

Second, problem equivalence reduction-recurrence

After analyzing the RxJava code execution process, it is found that the call to SingleEmitter#onError->doOnError(Consumer) in SingleOnSubscribe#subscribe is conditional, and the current Single must be in a non-Disposed state. The judgment logic is as follows:

2.1 Code location: io.reactivex.internal.operators.single.SingleCreate.Emitter#onError

@Override
public void onError(Throwable t) {<!-- -->
    if (!tryOnError(t)) {<!-- -->
        RxJavaPlugins.onError(t);
    }
}
@Override
public boolean tryOnError(Throwable t) {<!-- -->
    if (t == null) {<!-- -->
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (get() != DisposableHelper. DISPOSED) {<!-- -->
        Disposable d = getAndSet(DisposableHelper. DISPOSED);
        // Only when the asynchronous task is in a non-Disposed state will it be transferred to the onError() callback added by Single
        if (d != DisposableHelper. DISPOSED) {<!-- -->
            try {<!-- -->
                downstream.onError(t);
            } finally {<!-- -->
                if (d != null) {<!-- -->
                    d. dispose();
                }
            }
            return true;
        }
    }
    return false;
}

After trying to delay the asynchronous task mApi.loadData() by 2000ms, the onResume and onPuase states of the Activity are frequently switched, and the crash reappears.

  • Because the state of the Single task is DISPOSED, tryOnError() returns false, go to RxJavaPlugins.onError(),
    Look at the implementation code of RxJavaPlugins.onError():
// io.reactivex.plugins.RxJavaPlugins#onError
public static void onError(@NonNull Throwable error) {<!-- -->
\t\t// (1)
        Consumer<? super Throwable> f = errorHandler;

        if (error == null) {<!-- -->
            error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        } else {<!-- -->
            // (2) 
            if (!isBug(error)) {<!-- -->
                error = new UndeliverableException(error);
            }
        }

        if (f != null) {<!-- -->
            try {<!-- -->
            // (3)
                f. accept(error);
                return;
            } catch (Throwable e) {<!-- -->
            // (4) The processing here is more controversial.
                // Exceptions. throwIfFatal(e); TODO decide
                e.printStackTrace(); // NOPMD
                uncaught(e);
            }
        }
        // (5)
// f is null, leading to go here
        error.printStackTrace(); // NOPMD
        uncaught(error);
    }
  • First look at the implementation logic of the uncaught() method:
static void uncaught(@NonNull Throwable error) {<!-- -->
   Thread currentThread = Thread. currentThread();
   UncaughtExceptionHandler handler = currentThread. getUncaughtExceptionHandler();
   handler.uncaughtException(currentThread, error);
}

Tianxiu! It actually directly obtains the current UncaughtExceptionHandler and then transfers uncaughtException. You must know that this will follow the application’s crash reporting logic. Even if it is a logically written error, it will also cause a crash report (generally, the custom UncaughtExceptionHandler of the application will pop up a crash page and let the user Confirm whether to report the crash log, and finally kill the process).

  • (1) The errorHandler here is the exception handler passed in by our RxJavaPlugins.setErrorHandler. But when this problem occurs, a global exception handler is not passed in.
  • (2) If it is not the Exception type defined in isBug, if it is not the Exception type defined in isBug.
    In addition, isBug() also needs attention. A total of 6 types of Exception are defined as bugs, and common IOException and FileNotFoundException are not included.
static boolean isBug(Throwable error) {<!-- -->
    // user forgot to add the onError handler in subscribe
    if (error instanceof OnErrorNotImplementedException) {<!-- -->
        return true;
    }
    // the sender didn't honor the request amount
    // it's either due to an operator bug or concurrent onNext
    if (error instanceof MissingBackpressureException) {<!-- -->
        return true;
    }
    // general protocol violations
    // it's either due to an operator bug or concurrent onNext
    if (error instanceof IllegalStateException) {<!-- -->
        return true;
    }
    // nulls are generally not allowed
    // likely an operator bug or missing null-check
    if (error instanceof NullPointerException) {<!-- -->
        return true;
    }
    // bad arguments, likely invalid user input
    if (error instanceof IllegalArgumentException) {<!-- -->
        return true;
    }
    // Crash while handling an exception
    if (error instanceof CompositeException) {<!-- -->
        return true;
    }
    // everything else is probably due to lifecycle limits
    return false;
}
  • (3) f is the errorHandler object. If f is not empty and the accept() method does not throw an exception, then the DISPOSED crash will not occur in this Single asynchronous task state.
  • (4) f. accept() method:
    If the error is thrown directly in f.accept, the processing logic of uncaught will go through except here, and (5) the code position will be reported again. This will cause the same exception to be reported twice, and the crash background data will be directly x2. If it goes online, then your stability-crash rate data may be affected.
    Therefore, the implementation of the f.accept() method is very particular. If you really need to throw an exception again, you need to change the type. In fact, it is best to limit it to only throw it in the development environment, so that the application crashes and promotes bugs. Early detection reduces the risk of crashes after going live.

3. Repair method

3.1 Option 1: Set a global errorHandler, this one is needed to deal with the bottom line, but don’t abuse it.

RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {<!-- -->
    @Override
    public void accept(Throwable throwable) throws Exception {<!-- -->
        // If it is in a debug environment, let the exception be thrown to expose the problem as soon as possible.
        if (BuildConfig.DEBUG) {<!-- -->
            throw new RuntimeException(throwable);
        }
        // Also pay attention to printing log trace here, too frequent will lose performance,
        // Do not rely too much on this global exception handling,
        // Try to handle it perfectly in your own business code logic
        Log.d(TAG, "RxJava error handler accept: " + Log.getStackTraceString(throwable));
    }
});

3.2 Solution 2: Determine whether the asynchronous task is in the DISPOSED state when the asynchronous task is executed

Fourth, reflect on the use of RxJava

4.1 The task has been dispose, why still go onError/onSuccess?

4.2 For tasks in the disposed state, go to onError/onSuccess and choose to throw an exception directly. Why is this design?