Concurrency control of Promise – from ordinary concurrency pool to dynamic concurrency pool

1. Scene

You are given an array of 200 URLs. To send requests through these URLs, the number of concurrent requests cannot exceed five.

This is a very common interview question. Next, let us learn about Promise concurrency control

2. Implementation of ordinary concurrent pool

The main idea is to determine whether the current queue is full, wait if it is full, and fill up if there is room.

Using the Promise.race method, you can determine “who completes first” in a Promise array, so that the waiting function can start running.

/**Promise concurrency pool. When there are a large number of concurrent promises, you can use this to limit the number of concurrencies.
 * @param taskList task list
 * @param max maximum number of concurrencies
 * @param oneFinishCallback Each completed callback, the parameters are the current number of completions and the execution result, which can be used to create a progress bar
 * @retrun returns the results of each promise in the same order as the task list. Currently, both success and failure will be put into this result.
 */
export const promisePool = <T>(taskList: task<T>[], limit: number) => {
    return new Promise<T[]>(async (resolve, reject) => {
        try {
            const length = taskList.length
            /**Current concurrent pool */
            const pool: Promise<T>[] = []
            /**result array*/
            const res = new Array<T>(length)
            /**Quantity completed */
            let count = 0

            for (let i = 0; i < length; i + + ) {
                const task = taskList[i]();
                //Promise end callback
                const handler = (info: T) => {
                    pool.splice(pool.indexOf(task), 1) //Delete after task execution
                    res[i] = info //res.push cannot be used, otherwise the result order cannot be guaranteed
                    count++
                    if (count === length) {
                        resolve(res)
                    }
                }
                task.then((data) => {
                    handler(data)
                    console.log(`The ${i}th task is completed, the result is`, data);
                }, (err) => {
                    handler(err)
                    console.log(`The ${i}th task failed with reason `, err);
                })


                pool.push(task)

                //If the concurrency limit is reached, wait until any one of the pools ends
                if (pool.length >= limit) {
                    await Promise.race(pool)
                }
            }
        } catch (error) {
            console.error('Concurrency pool error', error);
            reject(error)
        }
    })
}

Test case:

        /**Create a Promise that will get the result after 1s */
    const getTask = () => {
        return async () => {
            await new Promise((resolve) => setTimeout(resolve, 1000))
            return new Date()
        }
    }

//Test case:
const testIt = async () => {
    const list = new Array(20).fill(0).map(() => getTask())
    const res = await promisePool(list, 5)
    console.log('res', res);
}
testIt()

Print result: (Observe the console, you can find that five of them appear)

3. Make the concurrency pool interruptible

Okay, now there is a new requirement. After the user clicks the cancel button, you need to interrupt and continue adding tasks to the concurrency pool. (Common scenario: When uploading in parts, the user clicks the Cancel Upload button)

The key core of the problem is how to terminate the internal cycle from the outside. In fact, it is very simple. Set a variable, initially false, and when the user clicks the cancel button, the variable changes to true. Check the value of this variable in the for loop, exit the loop if it is true.

But wecannot set this variable as a global variable! Otherwise, if multiple places need to use this concurrency pool and one is interrupted, all will suffer. Here, we can use object-oriented thinking to treat this variable as the value inside the object, and each instance is independent. “What does it have to do with me if you terminate yours?”

/**Promise concurrent pool - terminateable - creates an instance each time to avoid the cancellation of another pool causing the cancellation of this pool */
export class PromisePoolStatic<T, Err>{
    /**Whether to cancel. If this becomes true during the loop, it will be interrupted */
    private isStop = false
    /**Run a static Promise concurrency pool. When there are a large number of concurrent promises, you can use this to limit the number of concurrencies.
     * @param taskList task list
     * @param max maximum number of concurrencies
     * @retrun returns the results of each promise in the same order as the task list. Currently, both success and failure will be put into this result.
     */
    run = async (taskList: task<T>[], max: number) => {
        return new Promise<Array<T | Err>>(async (resolve, reject) => {
            type resType = T | Err
            try {
                this.isStop = false //Set to false at the beginning
                const length = taskList.length
                const pool: Promise<resType>[] = []//Concurrency pool
                let count = 0//How many have ended currently?
                const res = new Array<resType>(length)
                for (let i = 0; i < length; i + + ) {
                    let task = taskList[i]();
                    if (this.isStop) return reject('Concurrency pool terminated')
                    //Function to be executed on both success and failure
                    const handler = (_res: resType) => {
                        pool.splice(pool.indexOf(task), 1) //Whenever the concurrent pool finishes running a task, delete a task from the concurrent pool
                        res[i] = _res //Put into the result array
                        count++
                        if (count === length) {
                            return resolve(res)
                        }
                    }

                    task.then((data) => {
                        handler(data)
                        console.log(`The ${i}th task is completed, the result is`, data);
                    }, (err) => {
                        handler(err)
                        console.log(`The ${i}th task failed with reason `, err);
                    })

                    pool.push(task);

                    if (pool.length === max) {
                        //Use the Promise.race method to obtain a signal that a task in the concurrency pool is completed. When a task is completed, let the program continue execution and let the loop fill up the concurrency pool.
                        await Promise.race(pool)
                    }
                }

            } catch (error) {
                console.error('promise concurrent pool error', error);
                reject(error)
            }
        })
    }
    /**Stop the concurrent pool operation */
    stop = () => {
        this.isStop = true
    }
}

Test case:

/**Terminable concurrency pool test case */
const promisePoolStaticTest = () => {
    const list = new Array(18).fill(0).map(() => getTask())
    const pool = new PromisePoolStatic()
    pool.run(list, 3).catch((err) => {
        console.log('Terminable concurrent pool test case error -- ', err)
    })
    //18 tasks, each takes 1s to complete, the number of concurrency is 3, it takes 6s to complete in total
    //We interrupt at the third second
    setTimeout(() => pool.stop(), 3000)
}
promisePoolStaticTest()

The result is as follows:

It can be seen that after the ninth task ends, the concurrency pool does not enter a new task. But Why is it terminated and the callback of Promise completion is printed? Because when the termination function is executed, there are still three functions running in the concurrency pool, and the running Promise cannot be terminated, so new tasks can only be prevented from entering the concurrency pool. (Although Promise cannot be terminated, you can terminate the operation after Promise is completed, which will not be explained here)

4. Dynamic concurrent pool

The operations completed previously are all done after the task list has been determined before concurrency control is carried out. If we need the effect of dynamically adding tasks, if the queue is not full, it will run, and if the queue is full, it will wait. What should we do? (Common scenario: Global axios request concurrency control)

Main idea: If the queue is not full, run it directly; if the queue is full, add it to the waiting queue. After the task is completed, check if there is a task in the waiting queue.

type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**Install the task and its resolve and reject functions */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }


/**Dynamic concurrent pool */
export class PromisePoolDynamic<T> {
    /**Maximum number of concurrencies */
    private limit: number;
    /**The number currently running */
    private runningCount: number;
    /**waiting queue*/
    private queue: Array<taskWithCallbacks<T>>;

    /**Dynamic concurrent pool - constructor
     * @param maxConcurrency maximum number of concurrencies
     */
    constructor(maxConcurrency: number) {
        this.limit = maxConcurrency;
        this.runningCount = 0;
        this.queue = [];
    }

    /**Add task
     * @param task task, () => Promise<T>
     * @returns results
     */
    addTask(task: task<T>) {
        //Return a new Promise instance, which will remain in the pending state until the task is completed.
        return new Promise<T>((resolve, reject) => {
            const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;
            if (this.runningCount <this.limit) {//Run if the concurrent number is not full
                console.log('Task added: current concurrency number', this.runningCount, 'Concurrency number is not full, run directly');
                this.runTask(taskWithCallbacks);
            } else {//When the number of concurrency is full, join the waiting queue
                console.log('Task added: current concurrency number', this.runningCount, 'Concurrency number is full, suspended and waiting');
                this.queue.push(taskWithCallbacks);
            }
        });
    }
    /**Run the task
     * @param taskWithCallback Task with resolve and reject
     */
    private runTask(taskWithCallback: taskWithCallbacks<T>) {
        this.runningCount + + ;//Current number of concurrency + +
        taskWithCallback.task()//Remove task execution from object
            .then(result => {
                this.runningCount--;
                taskWithCallback.resolve(result);
                console.log('Task completed', result, 'Current concurrency count', this.runningCount);
                this.checkQueue();
            })
            .catch(error => {
                this.runningCount--;
                taskWithCallback.reject(error);
                this.checkQueue();
            });
    }
    /**After the operation is completed, check the queue to see if there is anything waiting, and if so, take out the first one and run it */
    private checkQueue() {
        if (this.queue.length > 0 & amp; & amp; this.runningCount <this.limit) {
            const nextTask = this.queue.shift()!;
            console.log('There is a vacancy in the concurrency pool, take out the tasks waiting in the queue', nextTask);
            this.runTask(nextTask);
        }
    }
}

Test case:

/**Test cases for dynamic concurrency pool */
const promisePoolDynamicTest = () => {
    const promisePoolDynamic = new PromisePoolDynamic(3) //A dynamic concurrency pool with a maximum concurrency of 3
    //Maximum concurrency is 3, I added 7 tasks at one time
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()

Test Results:

5. Conclusion

That’s it for the concurrency pool. In addition to using Promise.race, you can also use recursion and other methods, but Promise.race is the simplest and easiest to understand.

If there is something wrong in the code, please point it out.