Implement multi-part upload, resume upload, and instant upload (JS+NodeJS) (TypeScript)

1. Introduction and effects

Uploading files is a very common operation, but when the file is large, the upload will take a very long time, and the upload operation will be uncertain. If the connection is accidentally disconnected, the file It will need to be uploaded again, resulting in wasting time and network resources.

Therefore, when it comes to uploading large files, multipart upload and resumed upload are necessary. Divide the file into multiple shards, upload them one by one, and finally merge them on the server. If you encounter a network interruption problem, the uploaded fragments do not need to be uploaded to achieve resumable upload.

Usage effect: photo.lhbbb.top/video/slice.mp4 (Set the slice size to 1*1024B for easy observation)

2. Idea

1. Unique file identification

Because we need to perform operations such as uploading and resuming multiple fragments, we need to know the unique identifier of the file so that the backend can know which file these fragments belong to and can resume the upload at a breakpoint. Can determine whether this file has been uploaded before.

Method 1: You can get the unique identifier of the file by calculating the md5 of the file. (The file has been modified or the unique identifiers of different files are different)

Advantages: The uniqueness of file identification can be guaranteed, and there is a ready-made library for calculating md5 that can be called

Disadvantages: Calculating the md5 of a file is a very time-consuming operation. The larger the file, the more time it takes. JS is single-threaded and cannot perform other operations during the calculation of md5.

Method 2: It takes a long time to calculate md5 for large files, so the next best thing is to use file name + file last modification time + file size as the unique identifier.

Advantages: The calculation is fast because the data is available in the File object of the file.

Disadvantages: There is a very small probability that the logo will be duplicated.

Comprehensive method: It can be as follows: if the file size is below a certain threshold, use method one; when the file size is too large, use method two

2. Front end

1. Calculate the unique identifier: After getting the File file through the input tag, we can calculate the unique identifier.

2. Second upload: After calculating the unique identifier, use this unique identifier to request the backend interface, see if the file has been uploaded before, if so, “Upload successful” is displayed directly, which means secondary upload is achieved.

3. Resume upload from breakpoint: If the complete upload has not been successful before, the file will be segmented. After dividing the slices, take the slice information and unique identifier and request the back-end interface to see if the file and its slices are “uploaded, but not completely uploaded”. If they are consistent, resume at the breakpoint. Biography. (My idea here is to pass the total length of the fragments to the backend, and then the backend will see how many fragments there are in the backend folder and return the missing fragment serial number. It can also be calculated for each fragment. Unique identification, more secure, but more time-consuming)

4. Multiple upload: If this file has never been uploaded before, start the normal Multiple upload process. Traverse all shards and send the shards to the backend through requests. (Note: The number of concurrent requests of the browser is generally 6. If all shard requests are sent at once, subsequent requests will need to wait. In this regard, we can use the Promise concurrency pool to control strong>, slow down HTTP pressure)

5. Integrity verification: After all fragments are uploaded, integrity verification needs to be performed to prevent a certain fragment from encountering a failure and not being successfully uploaded. Send a request to the backend, and the backend will determine whether the number of fragments is normal, and if not, upload the missing fragments.

6. Send merge request: After the integrity check is passed, a merge request is sent, and the backend merges the files and returns the file access path.

Summary: Calculate the unique identifier –> If it has been uploaded, it will be uploaded in seconds –> Splitting –> If it has been partially uploaded, it will be resumed at a breakpoint –> Normal split upload –> Integrity check –> Merge request

3. Backend

1. Determine whether the file has been completely uploaded: According to the file md5, check whether the file in the corresponding directory exists

2. Determine whether the file has been uploaded into slices: Based on the file md5, check which slices are missing in the corresponding directory

3. Reception of multipart upload: Store the distribution in a temporary folder, and name the folder with a unique identifier to facilitate subsequent search. Each shard is named with the shard serial number to facilitate verification of whether the shard is missing.

4. Integrity check: Based on md5 and the total number of fragments, check which file serial numbers are missing in the corresponding temporary folder, and return the missing serial number array.

5. Merge files: Write the fragments into a new file in order of fragment serial numbers, and then return the access path of this new file.

3. Front-end implementation

The last word about second-second transfer and breakpoint-resumable transfer is a matter of function execution order.

Some of the ts types used are as follows:

/**The type of the progress bar function, the parameter is progress, which is a decimal, and *100 is required to be a normal progress bar*/
type onProgress = (progress: number) => any

/**Type of slice */
interface sliceType {
  /** Serial number of fragment */
  flag: number;
  /**Fragmented binary data */
  blob: Blob;
}

1. Calculate the unique identifier of the file

A library is used to calculate md5 here, which requires npm installation: npm i spark-md5

/** Obtain the md5 of the file and use it as a unique index - when the file is too large, it will take a long time to obtain the md5. You can consider using the file name + the last modification time of the file + the file size as a "unique" identifier* /
const getMd5 = (file: File) => {
  return new Promise<string>(async (resolve, reject) => {
    try {
      const reader = new FileReader();
      reader.readAsArrayBuffer(file);
      // When the file reading is completed, calculate the file MD5 value
      reader.onloadend = function (e) {
        if (!e.target?.result) {
          return reject('File reading failed')
        }
        const spark = new SparkMD5.ArrayBuffer()
        spark.append(e.target.result as ArrayBuffer)
        const md5 = spark.end()
        resolve(md5)
      }
    } catch (error) {
      reject(error)
    }
  })
}

2. Fragment the file

There is a slice method on the prototype object (Blob) of the File object, through which we can perform slicing.

Pass in the file object and the size of the required single fragment, in bytes (for example, passing in 5*1024*1024 is 5MB)

Get an array of fragments, each fragment contains the fragment sequence number and the corresponding Blob

/**File slicing*/
const fileSlice = (file: File, chunkSize: number) => {
  const result: sliceType[] = []
  let index = 0
  for (let nowSize = 0; nowSize < file.size; nowSize + = chunkSize) {
    result.push({
      flag: index,
      blob: file.slice(nowSize, nowSize + chunkSize),
    })
    index++
  }
  return result
}

3. Upload the parts

/**Generate promise function for uploading slices */
const getSliceUploadPromise = (slice: sliceType, md5: string) => {
  const formData = new FormData();
  formData.append(`file`, slice.blob);
  formData.append(`index`, String(slice.flag));
  formData.append(`md5`, md5);
  return () => request.postByForm('/sliceUpload/upload', formData) //Fill in your own encapsulated request function here
}

/**Upload all slices */
const sliceUpload = async (sliceList: sliceType[], md5: string, onProgress: onProgress) => {
  const taskList: (() => Promise<string>)[] = []
  const length = sliceList.length
  for (let i = 0; i < length; i + + ) {
    taskList.push(getSliceUploadPromise(sliceList[i], md5))
  }
  //Use concurrent pool optimization to avoid blocking
  const res = await promisePool<string, string>(taskList, 5, (count) => onProgress(count / length))
  return res
}

In order to avoid request congestion, a promise concurrency pool is used for optimization, with a maximum number of concurrencies of 5. (Instead of sending all requests at once)

/**Promise concurrency pool. When there are a large number of concurrent promises, you can use this to limit the number of concurrencies.
 * @param taskList task list
 * @param max maximum number of concurrencies
 * @param oneFinishCallback Each completed callback, the parameters are the current number of completions and the execution result, which can be used to create a progress bar
 * @retrun returns the results of each promise in the same order as the task list. Currently, both success and failure will be put into this result.
 * @template T Generic T will be filled in automatically, which is the result of successful promise
 * @template Err This generic is the result of a promise error (because success and failure will be put into res, so adding a generic can facilitate ts judgment)
 */
const promisePool = async <T, Err>(taskList: (() => Promise<T>)[], max: number, oneFinishCallback?: (count: number, res: T | Err) => any) => {
    return new Promise<Array<T | Err>>(async (resolve, reject) => {
        type resType = T | Err
        try {
            const length = taskList.length
            const pool: Promise<resType>[] = []//Concurrency pool
            let count = 0//How many have ended currently?
            const res = new Array<resType>(length)
            for (let i = 0; i < length; i + + ) {
                let task = taskList[i]();

                //Function to be executed on both success and failure
                const handler = (_res: resType) => {
                    pool.splice(pool.indexOf(task), 1) //Whenever the concurrent pool finishes running a task, delete a task from the concurrent pool
                    res[i] = _res //Put into the result array
                    count++
                    oneFinishCallback & amp; & amp; oneFinishCallback(count, _res)
                    if (count === length) {
                        return resolve(res)
                    }
                }

                task.then((data) => {
                    handler(data)
                    console.log(`The ${i}th task is completed, the result is`, data);
                }, (err) => {
                    handler(err)
                    console.log(`The ${i}th task failed with reason `, err);
                })

                pool.push(task);

                if (pool.length === max) {
                    //Promise.race: Returns the fastest executing promise
                    //Use Promise.race to see which task completion signal is obtained
                    //With await, once a task is found to be completed, continue the for loop and fill the concurrency pool.
                    await Promise.race(pool)
                }
            }

        } catch (error) {
            console.error('promise concurrent pool error', error);
            reject(error)
        }
    })

}

4. Integrity verification (also adapts to breakpoint resumption)

After the fragments are uploaded, an integrity check is performed to determine whether the fragments are complete.

getArr function: Send a request to the backend, and the backend returns an array of missing fragment serial numbers (returning an empty array means that the fragments are complete)
Integrity check: According to the missing fragment serial number, continue uploading the missing fragments (this is compatible with breakpoint resume upload). At the same time, the number of retries is limited. When there is still no complete upload after more than 5 times, the upload is judged to have failed.

/**Request the backend and obtain the missing fragment sequence number array*/
const getArr = async () => (await request.post('/sliceUpload/integrityCheck', { count: sliceList.length, md5 })).missingArr


/**Integrity check, continue uploading if missing (resume upload after breakpoint) */
const integrityCheck = async (sliceList: sliceType[], md5: string, onProgress: onProgress) => {
  let maxTest = 5 //Maximum number of attempts to avoid infinite attempts
  
  /** Missing serial number array */
  let missingArr: number[] = await getArr()
  /**Total number of shards */
  const sliceListLength = sliceList.length
    
  onProgress((sliceListLength - missingArr.length) / sliceListLength)//Update progress bar

  while (missingArr. length) {

    const tasks: (() => Promise<string>)[] = []
    for (let i = 0; i < missingArr.length; i + + ) {
      tasks.push(getSliceUploadPromise(sliceList[missingArr[i]], md5))
    }
    //Use concurrent pool to optimize requests
    await promisePool<string, string>(tasks, 5, (count) => onProgress((sliceListLength - (missingArr.length - count)) / sliceListLength))//The count here is the number of completed parts in the missing part, as progress bar.

    missingArr = await getArr() //After the upload is completed, perform integrity verification again.
    maxTest--
    if (maxTest === 0) {
      return Promise.reject('Five attempts still failed to upload')
    }
  }
}

5. Send merge request

After passing the integrity check, we come to the last step, merging slices (this is mainly the back-end work, the front-end will not explain too much)

/**Merge slices and get the path */
const merge = async (file: File, md5: string) => {
  const suffix = getSuffix(file.name) //Get the suffix
  const path = await request.post('/sliceUpload/merge', { md5, suffix })
  return path
}

6. Second transmission

The judgment of instant transfer is between the first step and the second step (after calculating the unique identifier, you can judge the instant transfer) (this is mainly the back-end work, the front-end will not explain too much)

/**Second upload - Determine whether the file has been uploaded. If it has been uploaded, it will directly return the file path and upload without fragmentation */
const isUploaded = async (file: File, md5: string) => {
  const res: isUploadedRes = await request.post('/sliceUpload/isUploaded', { md5, suffix })
  return res
}

// Note: Regarding the return value of the /sliceUpload/isUploaded interface:

type isUploadedRes = {
  /**Whether the upload is complete*/
  flag: boolean
  /**If it has been uploaded, what is the path (it will be empty if it has not been uploaded) */
  path: string
  /** (valid only when flag is false) Has it been uploaded, but not completely? If true, resume the download at breakpoint (please call the integrity check interface) */
  noComplete: boolean
}

7. Complete process

Watch it with the function above. The overall process needs to be modified according to the interface return value given by the backend, but the overall idea remains the same.

/**Multiple upload - only supports single file
 * @param file file
 * @param chunkSize the size of a fragment
 * @param setTip can be used for text prompts
 * @param onProgress callback for upload progress, the parameter is the progress
 * @returns URL of uploaded file
 */
export async function uploadBySlice(file: File, chunkSize: number, setTip: (tip: string) => any , onProgress: onProgress) {
  setTip("Calculating md5");
  const md5 = await getMd5(file)
  const isUploadedFlag = await isUploaded(file, md5)
  if (isUploadedFlag.flag) {//If it has been uploaded, return the path directly to achieve instant transmission.
    onProgress(1)
    setTip('File uploaded successfully')
    return isUploadedFlag.path
  } else {
    setTip('Slicing in progress')
    const sliceList = fileSlice(file, chunkSize)
    console.log('slice', sliceList);

    if (!isUploadedFlag.noComplete) { //Only for files that have not been transferred, the complete upload process will be completed (resumable upload)
      setTip('File upload in progress')
      await sliceUpload(sliceList, md5, onProgress)
    } else {
      setTip('Resuming upload in progress')
    }
    await integrityCheck(sliceList, md5, onProgress)//Whether it is a resumed upload from a breakpoint or a normal upload, this function is used for reuse (normal uploads must also be verified for integrity, and breakpoint resumed uploads must also be verified based on integrity to continue uploading)
    setTip("Merge files")
    const path = await merge(file, md5)
    setTip("File uploaded successfully!")
    return path
  }
}

4. Backend implementation

NextJS (NodeJS) is used here as the backend. Since each framework is used differently, only the key steps are written.

Some of the functions used in the following code regarding file operations in nodejs:

//This file performs some I/O operations
import fs from 'fs'
import path from 'path'
import 'server-only'//Represents that only the server can be used

/**Write the Buffer file under the specified path. The path will be created if it does not exist */
export const writeFile = (filePath: string, buffer: Buffer) => {
    return new Promise<string>(async (resolve, reject) => {
        try {
            const directory = path.dirname(filePath);
            fs.mkdir(directory, { recursive: true }, (err) => {
                if (err) {
                    reject(err);
                } else {
                    fs.writeFile(filePath, buffer, (err) => {
                        if (err) {
                            reject(err);
                        } else {
                            resolve(filePath);
                        }
                    });
                }
            });
        } catch (error) {
            reject(error)
        }
    })
}
/**Delete the file at the specified path */
export const deleteFile = (path: string) => {
    return new Promise<void>(async (resolve, reject) => {
        try {
            fs.unlink(path, (err) => {
                if (err) reject(err)
                else resolve()
            })
        } catch (error) {
            reject(error)
        }
    })

}
/**Delete the specified folder and all its files. */
export const deleteFolderRecursive = (folderPath: string) => {
    if (fs.existsSync(folderPath)) {
        fs.readdirSync(folderPath).forEach((file) => {
            const currentPath = `${folderPath}/${file}`;

            if (fs.lstatSync(currentPath).isDirectory()) {
                //Recursively delete subfolders
                deleteFolderRecursive(currentPath);
            } else {
                // Delete Files
                fs.unlinkSync(currentPath);
            }
        });
        // Delete empty folders
        fs.rmdirSync(folderPath);
    }
};
/** Append at the end of the file, if it does not exist, a new directory will be added */
export const appendToFile = (text: string | Buffer, filePath: string, errFn?: (err: NodeJS.ErrnoException | null) => void) => {
    return new Promise<void>(async (resolve, reject) => {
        try {
            const directory = path.dirname(filePath);
            fs.mkdir(directory, { recursive: true }, (err) => {
                if (err) {
                    reject(err);
                    return;
                }
                fs.appendFile(filePath, text, (err) => {
                    if (err) {
                        errFn?.(err);
                        reject(err);
                    } else {
                        resolve();
                    }
                });
            });
        } catch (error) {
            reject(error)
        }
    })

}
/**Get all files in the path folder */
export const getDir = (directoryPath: string) => {
    return new Promise<fs.Dirent[]>(async (resolve, reject) => {
        try {
            fs.readdir(directoryPath, { withFileTypes: true }, (err, files) => {
                if (err) {
                    reject(err);
                    return;
                }
                resolve(files)
            })
        } catch (error) {
            reject(error)
        }
    })
}
/** Determine whether the file (folder) exists. */
export const fileExists = (filePath: string): boolean => {
    return fs.existsSync(filePath);
};

1. Determine whether the file has been uploaded before

/sliceUpload/isUploaded interface

const { md5, suffix } = await xxxxx() //Get body or query parameters and perform parameter verification
 
const realPath = `https://xxx.com/xxxxxx/${md5}${suffix}` //Path for external access
const targetFilePath = `/aaaa/${md5}${suffix}`; //This is the path where the file is stored (if it has been completely uploaded)
/**Whether it has been completely uploaded*/
const flag = fileExists(targetFilePath)
/**Is there a temporary md5 folder under the temporary folder? If so, it means that the breakpoint can be resumed */
const noComplete = fileExists(getAbsPath(`/temp/${md5}`))
return resFn.success<isUploadedRes>({
    flag: flag,
    path: flag ? realPath : "",
    noComplete
});

2. Receive files uploaded in parts

/sliceUpload/upload interface

const [files, otherData] = await getFormData(request) //Get files and other data from formdata (encapsulate it according to the framework)
if (!files[0]) throw 'File does not exist'
await writeFile(`/temp/${otherData.md5}/${otherData.index}`, files[0]) //Stored in the /temp/{md5}/ directory
return resFn.success('Operation successful');

3. Integrity verification interface

/sliceUpload/integrityCheck

const { count, md5 } = await xxxx(request) //Get the data passed by the front end
const files = await getDir(`/temp/${md5}/`) //Get all files in this temporary folder
const judgeSet = new Set(Array.from({ length: count }, (k, i) => i)) //Generate a set based on the total number of shards (assuming the total number of shards is 10, then here are 0,1 ,2...,10 in a set)

//Delete the file serial numbers in the folder from the set
files.forEach((file) => {
    judgeSet.delete(parseInt(file.name))
})

//Return the missing serial number, the empty array represents the integrity check passed
return resFn.success({
    missingArr: [...judgeSet]
});

4. Merge file interface

/sliceUpload/merge

const { md5, suffix } = await xxx() //Get the parameters passed from the front end

const tempDirName = `/temp/${md5}/` //The path of the temporary folder


const files = await getDir(tempDirName)//Get all files in the temporary folder

files.sort((a, b) => parseInt(a.name) - parseInt(b.name)); // Sort the file name array in numerical order
 

//Write the sliced files into new files according to the slicing order
for (let i = 0; i < files.length; i + + ) {
    const file = files[i];
    const content = fs.readFileSync(`/temp/${md5}/${file.name}`);//The position of the slice
    await appendToFile(content, `/xxxx/${md5}${suffix}`) // Write the path to store the file on the server
}

deleteFolderRecursive(tempDirName)//Delete the slice files in the temp folder


const realPath = `http://xxxx.com/xxxxx/${md5}${suffix}`//Path for external access

return resFn.success(realPath);

5. Summary

The main thing is that you need to have ideas. Once you have the ideas, it will be easy to write. The above ideas are my own, so there may be some imperfections. If there are any mistakes, please point them out.