Be wary of invisible retry mechanisms: why you must consider idempotence when using RPC

0 Article Overview

In RPC scenarios, duplicate data problems caused by retries or failure to implement an idempotent mechanism must be paid attention to. It may cause problems such as multiple orders being created for one purchase, and a notification message being sent multiple times. This is a technical issue. Problems that people must face and solve.

Some people may say: When the call fails, the program does not show a retry, why does the duplicate data problem still occur? This is because even if no retries are shown, the RPC framework automatically retries in the cluster fault tolerance mechanism. This issue must be paid attention to.

In this article, we use the DUBBO framework as an example to analyze three issues: why to retry, how to do retry, and how to do idempotence.

Invisible retry mechanism.jpeg

1 Why retry

If we simply classify an RPC interaction process, we can divide it into three categories: successful response, failed response, and no response.

RPC3.jpg

Consumers can easily handle the two situations of successful response and failed response. Because the response information is clear, you only need to continue processing the success or failure logic based on the response information. However, the scenario of no response is more difficult to handle because no response may include the following situations:

(1) The producer did not receive the request at all
(2) The producer received the request and processed it successfully, but the consumer did not receive the response.
(3) The producer received the request and failed to process it, but the consumer did not receive the response.

Assuming you are an RPC framework designer, should you choose to retry or give up the call? In fact, the final choice depends on the business characteristics. Some businesses are idempotent themselves, but some businesses cannot allow retries otherwise they will cause duplicate data.

So who is most familiar with business features? The answer is consumers, because consumers as callers are definitely most familiar with their own business, so the RPC framework only needs to provide some strategies for consumers to choose.

2 How to do retry

2.1 Cluster Fault Tolerance Strategy

As an excellent RPC framework, DUBBO provides the following cluster fault-tolerance strategies for consumers to choose from:

Failover: Failover
Failfast: Fail fast
Failsafe: Fail safely
Failback: Asynchronous retry
Forking: parallel calls
Broadcast: Broadcast call
(1) Failover

Failover strategy. As the default strategy, when an exception occurs in consumption, a producer node is selected through the load balancing strategy to call until the number of retries is reached.

(2) Failfast

Fail fast strategy. The consumer only consumes the service once and throws it directly when an exception occurs.

(3) Failsafe

Safe failure strategy. The consumer only consumes the service once. If the consumption fails, an empty result is packaged and no exception is thrown.

(4) Failback

Asynchronous retry strategy. When an exception occurs in consumption, an empty result is returned, and failed requests will be retried asynchronously. If the retry exceeds the maximum number of retries and fails, the retry will be given up and no exception will be thrown.

(5) Forking

Parallel calling strategy. The consumer calls multiple producers concurrently through the thread pool. As long as one succeeds, it is considered successful.

(6) Broadcast

Broadcast calling strategy. The consumer traverses and calls all producer nodes, and if any exception occurs, an exception is thrown.

2.2 Source Code Analysis

2.2.1 Failover

The Failover failover strategy is used as the default strategy. When an exception occurs in consumption, a producer node is selected through the load balancing strategy to be called until the number of retries is reached. Even if the business code does not show retry, it is possible to execute the consumption logic multiple times resulting in duplicate data:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // All producer Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);

        // Get the number of retries
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;

        //Producer that has been called
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);

        //Retry until the maximum number of times is reached
        for (int i = 0; i < len; i + + ) {
            if (i > 0) {

                // Throws an exception if the current instance is destroyed
                checkWhetherDestroyed();

                //Select available producer Invokers based on routing strategy
                copyInvokers = list(invocation);

                // check again
                checkInvokers(copyInvokers, invocation);
            }

            // Load balancing selects a producer Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // Service consumption initiates remote calls
                Result result = invoker.invoke(invocation);
                if (le != null & amp; & amp; logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                //Return if there is a result
                return result;
            } catch (RpcException e) {
                //Business exceptions are thrown directly
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException is not thrown and continues to try again
                le = new RpcException(e.getMessage(), e);
            } finally {
                //Save the producers that have been visited
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}

When a consumer calls producer node A and an RpcException occurs (such as a timeout exception), before the maximum number of retries is reached, the consumer will select another producer node to consume again through the load balancing strategy. Imagine that if producer node A has actually processed successfully, but failed to return the successful result to the consumer in time, then retrying may cause duplicate data problems.

2.2.2 Failfast

Fail fast strategy. The consumer only consumes the service once. When an exception occurs, it is thrown directly without retrying:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // Check whether the producer Invokers is legal
        checkInvokers(invokers, invocation);

        // Load balancing selects a producer Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // Service consumption initiates remote calls
            return invoker.invoke(invocation);
        } catch (Throwable e) {

            // If service consumption fails, an exception will be thrown directly without retrying.
            if (e instanceof RpcException & amp; & amp; ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                   "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                    + " select from all providers " + invokers + " for service " + getInterface().getName()
                                    + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                    + " use dubbo version " + Version.getVersion()
                                    + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                   e.getCause() != null ? e.getCause() : e);
        }
    }
}

2.2.3 Failsafe

Safe failure strategy. The consumer only consumes the service once. If the consumption fails, an empty result will be packaged, no exception will be thrown, and no retry will be performed:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {

            // Check whether the producer Invokers is legal
            checkInvokers(invokers, invocation);

            // Load balancing selects a producer Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

            // Service consumption initiates remote calls
            return invoker.invoke(invocation);

        } catch (Throwable e) {
            // Consumption failure is packaged as an empty result object
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult();
        }
    }
}

2.2.4 Failback

Asynchronous retry strategy. When an exception occurs in consumption, an empty result is returned, and failed requests will be retried asynchronously. If the retry exceeds the maximum number of retries and fails, the retry is given up and no exception is thrown:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // Create timer
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        //Construct a scheduled task
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            //Put the scheduled task into the timer and wait for execution
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {

            // Check whether the producer Invokers is legal
            checkInvokers(invokers, invocation);

            // Responsible for selecting a producer Invoker in a balanced manner
            invoker = select(loadbalance, invocation, invokers, null);

            // The consumer service initiates a remote call
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);

            //If service consumption fails, record the failed request
            addFailed(loadbalance, invocation, invokers, invoker);

            //return empty result
            return new RpcResult();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        if (failTimer != null) {
            failTimer.stop();
        }
    }

    /**
     *RetryTimerTask
     */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;

        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }

        @Override
        public void run(Timeout timeout) {
            try {
                // Load balancing selects a producer Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;

                // Service consumption initiates remote calls
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);

                //Exceeding the maximum number of retries and logging without throwing an exception
                if (( + + retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // Replace the timer if the maximum number of retries has not been exceeded
                    rePut(timeout);
                }
            }
        }

        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }

            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
}

2.2.5 Forking

Parallel calling strategy. The consumer calls multiple producers concurrently through the thread pool. As long as one succeeds, it is considered successful:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;

            // Get configuration parameters
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // Get the list of Invokers executed in parallel
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i + + ) {
                    // Select producer
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // Prevent repeated addition of Invoker
                    if (!selected.contains(invoker)) {
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {

                //Concurrent execution in thread pool
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //Execute consumption logic
                            Result result = invoker.invoke(invocation);
                            //Storage consumption results
                            ref.offer(result);
                        } catch (Throwable e) {
                            // If the number of exceptions is greater than or equal to the value of the forks parameter, it means that all calls failed, and the exceptions are put into the queue.
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // Get results from queue
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // If the exception type indicates that all calls failed, throw an exception
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage (), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}

2.2.6 Broadcast

Broadcast calling strategy. The consumer traverses and calls all producer nodes, and if any exception occurs, an exception is thrown:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        // Traverse and call all producer nodes
        for (Invoker<T> invoker : invokers) {
            try {
                //Execute consumption logic
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        // If any exception occurs, an exception is thrown
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}

3 How to make idempotent

After the above analysis, we know that the retry mechanism of the RPC framework may cause data duplication problems, so idempotence must be considered when using it. Idempotence means that one operation produces the same result as multiple operations, and no inconsistency occurs due to multiple operations. Common idempotent solutions include cancel retry, idempotent tables, database locks, and state machines.

3.1 Cancel retry

There are two ways to cancel retries. The first is to set the number of retries to zero, and the second is to choose a cluster fault-tolerance strategy that does not retry.

<!-- Set the number of retries to zero -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />

<!-- Select a cluster fault tolerance solution -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />

3.2 Idempotent table

Assuming that after the user pays successfully, the payment system will send a payment success message to the message queue. The logistics system subscribes to this message and prepares to create a logistics order for this order.

However, the message queue may be pushed repeatedly, and the logistics system may receive this message multiple times. The effect we hope to achieve is: no matter how many duplicate messages are received, only one logistics order can be created.

The solution is the idempotent table scheme. Create a new idempotent table. This table is used to be idempotent and has no other business significance. There is a field named key with a unique index. This field is idempotent.

After the logistics system subscribes to the message, it first tries to insert into the idempotent table, with the order number as the key field. If successful, continue to create the logistics order. If the order number already exists, it violates the principle of uniqueness and cannot be inserted successfully, indicating that business processing has been performed and the message is discarded.

The amount of data in this table will be relatively large. We can archive the data through scheduled tasks. For example, only 7 days of data will be retained, and other data will be stored in the archive table.

There is also a generalized idempotent table that we can use Redis to replace the database. Before creating a logistics order, we can check whether the order number data exists in Redis, and at the same time, we can set a 7-day expiration time for this type of data.

3.3 State Machine

After the logistics order is successfully created, a message will be sent. After the order system subscribes to the message, the update status is completed. Assume that the change is to update the order status 0 to status 1. The order system may also receive multiple messages, and may still receive a logistics order creation success message after the status has been updated to status 1.

The solution is the state machine approach. First draw the state machine diagram and analyze the state flow pattern. For example, after analysis, state 1 is already the final state, so even if the logistics order creation success message is received, it will no longer be processed and the message will be discarded.

3.4 database lock

Database locks can be divided into two types: pessimistic locks and optimistic locks. Pessimistic locks lock when acquiring data:

select * from table where col='xxx' for update

Optimistic locking is to lock when updating. The first step is to find the data, which contains the version field. The second step is to perform the update operation. If the record has been modified at this time, the version field has changed and the update cannot be successful:

update table set xxx,
version = #{version} + 1
where id = #{id}
and version = #{version}

4 Article Summary

This article first analyzes why retrying is an issue, because retrying is an important option for unresponsive RPC interaction scenarios. Then the six cluster fault-tolerance strategies provided by DUBBO were analyzed. Failover provides a retry mechanism as the default strategy. When the business code does not show retry, it is still possible to initiate multiple calls, which must be paid attention to. Finally, we analyzed several commonly used idempotent schemes. We hope this article will be helpful to everyone.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Network Skill TreeHomepageOverview 39514 people are learning the system