C++ multi-threading to achieve decoding and encoding synchronization

The first format conversion process was to input and decode a frame, then encode the frame and then output it. It was found that the input decoding and encoding output parts do not interfere with each other, so we can design two threads to implement decoding and encoding. Improve efficiency at the same time.

Principle

The principle I think is very simple:

①Decoding: Continuously decode and put into the queue. End – If the queue is not empty, the loop waits for encoding to complete and then sets the global end flag.

②Encoding: The circular queue is either empty or encoded. End – end with global end flag

(It is found here that decoding is much faster than encoding. If the decoding thread ends early, the content in the queue will be set to 0, so the decoding and encoding threads end together)

By maintaining a queue and global variables in the middle for communication, multi-threading can be implemented very simply.

C++ Multithreading

Not many things are needed to do simple multithreading

1. Create a thread

Just wrap it in a function. The first parameter is the function name, and the following parameters are the parameters you want to pass into this function. Then you can implement your own decoding thread in the decodeThread1 function.

#include <thread>
std::thread decodeThr(decodeThread1, inputFile);

2. Mutex

For variables that both threads need to access, problems will occur if they happen to be accessed at the same time, so use a mutex to lock them before accessing them.

#include <mutex>
std::mutex mtx;

When accessing a variable that may be accessed by both threads, lock the mutex and then access it.

mtx.lock();
//Variables to be accessed
mtx.unlock();

3. Communication between threads

There are many ways to transfer information between threads. We can use the simplest global variables to achieve communication.

4. How the thread ends

①Join method, when the main thread encounters this statement, it will stop and wait for the thread to end.

decodeThr.join();

②Detach mode, the main thread continues to run regardless of it. If the main thread ends, it will ignore it and end the entire program.

decodeThr.detach();

5.Conditional variables

#include <condition_variable>
std::condition_variable cv; 

A thing that implements thread waiting. For example, the encoding thread will not start until the decoding thread has finished decoding a frame. It will have to wait for a while in the previous period. I tried it and found that using the waiting mechanism in my simple example was a bit redundant, so I didn’t add it later.

Overall code

/*
* @Time: 2023.11.1
* @Author: Wu Liu
* @File: T_format9 + thread.cpp
* @Function: format conversion
* @Multi-threaded test
*/
extern "C" {
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libswscale/swscale.h>
}
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include "SCOPEG.h"
#include <queue>
#include <time.h>
/* @T_format6.cpp
*Basic process:
* 1. Parse the input file, obtain the stream information, and determine the audio and video decoder parameters and context.
* 2. Configure audio and video encoder parameters according to output requirements
* 3. Loop each frame to decode, re-encode and output
* 4. Memory cleaning
*/
/*Multithreading:
* 1. A thread for reading and decoding audio and video packets: decoding in a loop until a complete frame is transferred to the queue
* 2. Audio and video encoding and output in one thread: take out a frame from the queue to determine whether it is audio or video, and then encode it and send it out.
* 3. Lock the frame data queue during decoding and unlock it after decoding the complete frame.
* 4. When encoding obtains frame data, lock the queue and complete the release.
* 5. Loop through all data and then proceed to the main process.
*/
std::mutex mtx; // Mutex lock, ensuring mutual exclusion of thread access
std::condition_variable cv; // Condition variable, used for communication between threads
int ret = 0;
queue<AVFrame*> FrameQueue;
bool isFramedecode = false;
bool End_all=false;

void decodeThread1(const std::string & amp; inputFile) {
    AVFormatContext* inputFormatContext = nullptr;
    AVCodecContext* videoCodecContext = nullptr;
    AVCodecContext* audioCodecContext = nullptr;
    AVStream* videoStream = nullptr;
    AVStream* audioStream = nullptr;
    // allocate frame object
    AVFrame* videoFrame = av_frame_alloc();
    AVFrame* audioFrame = av_frame_alloc();
    AVPacket* inputPacket = av_packet_alloc();
    ON_SCOPE_EXIT{ av_frame_free( & amp;videoFrame); };
    ON_SCOPE_EXIT{ av_frame_free( & amp;audioFrame); };
    ON_SCOPE_EXIT{ av_packet_free( & amp;inputPacket); };
    if (!videoFrame || !audioFrame || !inputPacket) {
        std::cout << "Failed to allocate frame object" << std::endl;
        return ;
    }
    //Open input file
    if (avformat_open_input( & amp;inputFormatContext, inputFile.c_str(), nullptr, nullptr) != 0) {
        std::cout << "Unable to open input file" << std::endl;
        return ;
    }
    ON_SCOPE_EXIT{ avformat_close_input( & amp;inputFormatContext); };
    // Get stream information
    if (avformat_find_stream_info(inputFormatContext, nullptr) < 0) {
        std::cout << "Unable to obtain input file stream information" << std::endl;
        return ;
    }

    //Find video stream and audio stream index
    int videoStreamIndex = -1;
    int audioStreamIndex = -1;
    for (int i = 0; i < inputFormatContext->nb_streams; i + + ) {
        if (inputFormatContext->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
            videoStreamIndex = i;
        }
        else if (inputFormatContext->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
            audioStreamIndex = i;
        }
    }

    if (videoStreamIndex == -1 || audioStreamIndex == -1) {
        std::cout << "Video stream not found" << std::endl;
        return ;
    }

    // Get video and audio streams
    videoStream = inputFormatContext->streams[videoStreamIndex];
    audioStream = inputFormatContext->streams[audioStreamIndex];

    // Get the video decoder
    const AVCodec* videoCodec = avcodec_find_decoder(videoStream->codecpar->codec_id);
    if (!videoCodec) {
        std::cout << "Video decoder not found" << std::endl;
        return ;
    }

    //Create and open the video decoder context
    videoCodecContext = avcodec_alloc_context3(videoCodec);
    if (!videoCodecContext) {
        std::cout << "Failed to create video decoder context" << std::endl;
        return ;
    }
    ON_SCOPE_EXIT{ avcodec_free_context( & amp;videoCodecContext); };
    //Video stream parameters to fill in the context context
    avcodec_parameters_to_context(videoCodecContext, videoStream->codecpar);
    if (avcodec_open2(videoCodecContext, videoCodec, nullptr) < 0) {
        std::cout << "Failed to open video decoder" << std::endl;
        return ;
    }

    // Get the audio encoder
    const AVCodec* audioCodec = avcodec_find_decoder(audioStream->codecpar->codec_id);
    if (!audioCodec) {
        std::cout << "Failed to obtain audio encoder" << std::endl;
        return ;
    }
    //Create and open the audio decoder context
    audioCodecContext = avcodec_alloc_context3(audioCodec);
    if (!audioCodecContext) {
        std::cout << "Failed to create audio encoder context" << std::endl;
        return ;
    }
    ON_SCOPE_EXIT{ avcodec_free_context( & amp;audioCodecContext); };

    //Audio stream parameters fill context
    avcodec_parameters_to_context(audioCodecContext, audioStream->codecpar);
    if (avcodec_open2(audioCodecContext, audioCodec, nullptr) < 0) {
        std::cout << "Failed to open audio encoder" << std::endl;
        return ;
    }
    //Print input information
    av_dump_format(inputFormatContext, 0, inputFile.c_str(), 0);
    
    //decoding
    while (av_read_frame(inputFormatContext, inputPacket) >= 0) {
        if (inputPacket->stream_index == videoStreamIndex) {
            ret = avcodec_send_packet(videoCodecContext, inputPacket);
            if (ret < 0) {
                break;
            }
            while (ret >= 0) {
                ret = avcodec_receive_frame(videoCodecContext, videoFrame);
                if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
                    break;
                }
                else if (ret < 0) {
                    std::cout << "Video decoding ret exception" << std::endl;
                    return;
                }
                //Transmit the frame to the queue and create a new AVFrame variable
                videoFrame->quality = 1;
                AVFrame* videoFrame_ = av_frame_clone(videoFrame);
                mtx.lock();
                FrameQueue.push(videoFrame_);
                mtx.unlock();
                break;
            }
            av_packet_unref(inputPacket);
        }
        else if (inputPacket->stream_index == audioStreamIndex) {
            //Audio stream processing
            ret = avcodec_send_packet(audioCodecContext, inputPacket);
            if (ret < 0) {
                break;
            }
            while (ret >= 0) {
                ret = avcodec_receive_frame(audioCodecContext, audioFrame);
                if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
                    break;
                }
                else if (ret < 0) {
                    std::cout << "Audio decoding ret exception" << std::endl;
                    return ;
                }
                //Transmit the frame to the queue and create a new AVFrame variable
                AVFrame* audioFrame_ = av_frame_clone(audioFrame);
                mtx.lock();
                FrameQueue.push(audioFrame_);
                mtx.unlock();
                break;
            }
            av_packet_unref(inputPacket);
        }

    }

    bool FrameQueue_is_empty = false;
    while (!FrameQueue_is_empty)
    {
        mtx.lock();
        if (FrameQueue.empty())
            FrameQueue_is_empty = true;
        mtx.unlock();
    }

    mtx.lock();
    End_all = true;
    mtx.unlock();

}
void encodeThread1(const std::string & amp; outputFileName, const std::string & amp; Format, AVStream* audioStream) {
    AVFormatContext* outputFormatContext = nullptr;
    SwsContext* swsContext = nullptr;
    AVCodecID videoCodecId;
    AVCodecID audioCodecId;
    AVPacket* videoOutputPacket = av_packet_alloc();
    AVPacket* audioOutputPacket = av_packet_alloc();
    ON_SCOPE_EXIT{ av_packet_free( & amp;videoOutputPacket); };
    ON_SCOPE_EXIT{ av_packet_free( & amp;audioOutputPacket); };
    if ( !videoOutputPacket || !audioOutputPacket) {
        std::cout << "Failed to allocate frame object" << std::endl;
        return ;
    }
    { // Codec control
        if (Format == "avi")
        {
            videoCodecId = AV_CODEC_ID_MPEG2VIDEO;
            audioCodecId = AV_CODEC_ID_PCM_S16LE;
        }
        else if (Format == "mp4")
        {
            videoCodecId = AV_CODEC_ID_H264;
            audioCodecId = AV_CODEC_ID_AAC;
        }
        else if (Format == "wmv")
        {
            videoCodecId = AV_CODEC_ID_MSMPEG4V3;
            audioCodecId = AV_CODEC_ID_WMAV2;
        }
        else if (Format == "mkv")
        {
            videoCodecId = AV_CODEC_ID_H264;
            audioCodecId = AV_CODEC_ID_MP3;
        }
        else if (Format == "flv")
        {
            videoCodecId = AV_CODEC_ID_H264;
            audioCodecId = AV_CODEC_ID_AAC;
        }
        else {
            std::cout << "Conversion to this format is not supported" << std::endl;
            return ;
        }
    }
    //Create a context for the output file
    avformat_alloc_output_context2( & amp;outputFormatContext, nullptr, nullptr, outputFileName.c_str());
    if (!outputFormatContext) {
        std::cout << "Failed to create context for output file" << std::endl;
        return ;
    }
    ON_SCOPE_EXIT{ avformat_free_context(outputFormatContext); };

    //Add video stream to output context
    AVStream* outVideoStream = avformat_new_stream(outputFormatContext, nullptr);
    if (!outVideoStream) {
        std::cout << "Failed to add video stream to output file" << std::endl;
        return ;
    }
    outVideoStream->id = outputFormatContext->nb_streams - 1;
    //avcodec_parameters_copy(outVideoStream->codecpar, videoStream->codecpar);
    outVideoStream->codecpar->codec_tag = 0;

    //Set video encoder
    const AVCodec* outVideoCodec = avcodec_find_encoder(videoCodecId);
    if (!outVideoCodec) {
        std::cout << "Failed to set video encoder" << std::endl;
        return ;
    }
    AVCodecContext* outVideoCodecContext = avcodec_alloc_context3(outVideoCodec);
    if (!outVideoCodecContext) {
        std::cout << "Failed to set video encoder context" << std::endl;
        return ;
    }
    ON_SCOPE_EXIT{ avcodec_free_context( & amp;outVideoCodecContext); };
    //Video encoder parameter settings
        //avcodec_parameters_to_context(outVideoCodecContext, outVideoStream->codecpar);
        outVideoCodecContext->codec_id = videoCodecId;
        //outVideoCodecContext->time_base = videoStream->time_base;
        outVideoCodecContext->time_base.den = 25;
        outVideoCodecContext->time_base.num = 1;
        outVideoCodecContext->gop_size = 13;
        outVideoCodecContext->bit_rate = 8000000;
        outVideoCodecContext->refs = 0;
        outVideoCodecContext->max_b_frames =10;
        outVideoCodecContext->width = 1920;
        outVideoCodecContext->height = 1080;
        outVideoCodecContext->pix_fmt = AV_PIX_FMT_YUV420P;

    //Copy parameters from the output context to the output stream
    avcodec_parameters_from_context(outVideoStream->codecpar, outVideoCodecContext);


    //Open video encoder
    if (avcodec_open2(outVideoCodecContext, outVideoCodec, nullptr) < 0) {
        std::cout << "Unable to open video encoder" << std::endl;
        return ;
    }

    //Add audio stream to output file
    AVStream* outAudioStream = avformat_new_stream(outputFormatContext, nullptr);
    if (!outAudioStream) {
        std::cout << "Failed to add audio stream to output file" << std::endl;
        return ;
    }

    outAudioStream->id = outputFormatContext->nb_streams - 1;
    //Copy the output audio stream parameters
    avcodec_parameters_copy(outAudioStream->codecpar, audioStream->codecpar);
    /*outAudioStream->time_base.den = 11025;
    outAudioStream->time_base.num = 256;
    outAudioStream->codecpar->bit_rate = 320018;
    outAudioStream->codecpar->profile = 1;
    outAudioStream->codecpar->sample_rate = 44100;
    outAudioStream->codecpar->frame_size = 1024;
    av_channel_layout_default( & amp;outAudioStream->codecpar->ch_layout, 2);
    outAudioStream->codecpar->ch_layout.nb_channels = 3;*/
    outAudioStream->codecpar->codec_tag = 0;

    //Set audio encoder
    const AVCodec* outAudioCodec = avcodec_find_encoder(audioCodecId);
    if (!outAudioCodec) {
        std::cout << "Failed to set audio encoder" << std::endl;
        return ;
    }
    AVCodecContext* outAudioCodecContext = avcodec_alloc_context3(outAudioCodec);
    if (!outAudioCodecContext) {
        std::cout << "Failed to set audio encoder context" << std::endl;
        return ;
    }
    ON_SCOPE_EXIT{ avcodec_free_context( & amp;outAudioCodecContext); };
    //Audio encoder parameters
    avcodec_parameters_to_context(outAudioCodecContext, outAudioStream->codecpar);
    outAudioCodecContext->codec_id = audioCodecId;
    outAudioCodecContext->time_base = audioStream->time_base;
    //outAudioCodecContext->time_base.den = 51111100;
    //outAudioCodecContext->time_base.num = 1;
    //outAudioCodecContext->sample_rate = 43110;
    outAudioCodecContext->sample_fmt = AV_SAMPLE_FMT_S16;
    //av_channel_layout_default( & amp;outAudioCodecContext->ch_layout, 2);
    avcodec_parameters_from_context(outAudioStream->codecpar, outAudioCodecContext);
    if (Format == "flv")
    {
        outAudioCodecContext->sample_fmt = AV_SAMPLE_FMT_FLTP;
        //av_channel_layout_default( & amp;outAudioCodecContext->ch_layout, audioCodecContext->ch_layout.nb_channels);

    }
    //Open the audio encoder
    if (avcodec_open2(outAudioCodecContext, outAudioCodec, nullptr) < 0) {
        std::cout << "Unable to open audio encoder" << std::endl;
        return ;
    }

    //Open output file
    if (!(outputFormatContext->oformat->flags & amp; AVFMT_NOFILE)) {
        if (avio_open( & amp;outputFormatContext->pb, outputFileName.c_str(), AVIO_FLAG_WRITE) < 0) {
            std::cout << "Unable to open output file" << std::endl;
            return ;
        }
    }
   
    //Write output file header
    if (avformat_write_header(outputFormatContext, nullptr) < 0) {
        std::cout << "Unable to write output file header" << std::endl;
        return ;
    }
    //Print out relevant information
    av_dump_format(outputFormatContext, 0, outputFileName.c_str(), 1);
    int nVideoCount = 0;
    int nAudioCount = 0;
    bool FrameQueue_is_ready = false;
    //std::this_thread::sleep_for(1s);
    while (1) {
        AVFrame* Frame=nullptr;
        mtx.lock();
        if (!FrameQueue.empty()) {
            FrameQueue_is_ready = true;
            Frame = FrameQueue.front();
            FrameQueue.pop();
        }
        else FrameQueue_is_ready = false;
        mtx.unlock();
        if (FrameQueue_is_ready)
        {
            if (Frame->quality == 1) {
                // Encode video frames
                Frame->pts = (int64_t)(40 * (nVideoCount) / av_q2d(outVideoCodecContext->time_base) / 1000.0);//Time
                nVideoCount + + ;
                ret = avcodec_send_frame(outVideoCodecContext, Frame);
                if (ret < 0) {
                    std::cout << "Frame exception received by video encoding" << std::endl;
                    break;
                }

                while (ret >= 0) {
                    ret = avcodec_receive_packet(outVideoCodecContext, videoOutputPacket);
                    if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
                        break;
                    }
                    else if (ret < 0) {
                        std::cout << "Video encoding ret exception" << std::endl;
                        return;
                    }

                    av_packet_rescale_ts(videoOutputPacket, outVideoCodecContext->time_base, outVideoStream->time_base);
                    videoOutputPacket->stream_index = outVideoStream->index;

                    //Write video frames to output file
                    ret = av_interleaved_write_frame(outputFormatContext, videoOutputPacket);
                    if (ret < 0) {
                        break;
                    }
                }
            }
            else {
                // Encode audio frames
                //Frame->pts = (int64_t)( (nAudioCount) / av_q2d(outAudioCodecContext->time_base) / 44100.0);//Time
                Frame->pts = nAudioCount * 1024;
                nAudioCount + + ;
                ret = avcodec_send_frame(outAudioCodecContext, Frame);
                if (ret < 0) {
                    break;
                }

                while (ret >= 0) {
                    ret = avcodec_receive_packet(outAudioCodecContext, audioOutputPacket);
                    if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
                        break;
                    }
                    else if (ret < 0) {
                        std::cout << "Audio encoding ret exception" << std::endl;
                        return;
                    }

                    av_packet_rescale_ts(audioOutputPacket, outAudioCodecContext->time_base, outAudioStream->time_base);
                    audioOutputPacket->stream_index = outAudioStream->index;

                    //Write audio frames to the output file
                    ret = av_interleaved_write_frame(outputFormatContext, audioOutputPacket);
                    if (ret < 0) {
                        break;
                    }
                }
            }
            //Release each new AVFrame
            av_frame_free( & amp;Frame);
        }
        if (End_all) { break; }

    }
    //Write to the end of the output file
    av_write_trailer(outputFormatContext);
}


bool Format_conver(const std::string & amp; inputFile, const std::string & amp; outputFileName, const std::string & amp; Format)
{
    avformat_network_init(); // Initialize the network library
    AVStream* audioStream = nullptr;
    AVFormatContext* inputFormatContext = nullptr;
    //Open input file
    if (avformat_open_input( & amp;inputFormatContext, inputFile.c_str(), nullptr, nullptr) != 0) {
        std::cout << "Unable to open input file" << std::endl;
        return false;
    }
    ON_SCOPE_EXIT{ avformat_close_input( & amp;inputFormatContext); };
    // Get stream information
    if (avformat_find_stream_info(inputFormatContext, nullptr) < 0) {
        std::cout << "Unable to obtain input file stream information" << std::endl;
        return false;
    }
    //Find video stream and audio stream index
    int videoStreamIndex = -1;
    int audioStreamIndex = -1;
    for (int i = 0; i < inputFormatContext->nb_streams; i + + ) {
        if (inputFormatContext->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
            videoStreamIndex = i;
        }
        else if (inputFormatContext->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
            audioStreamIndex = i;
        }
    }
    if (videoStreamIndex == -1 || audioStreamIndex == -1) {
        std::cout << "Video stream not found" << std::endl;
        return false;
    }
    // Get video and audio streams
    audioStream = inputFormatContext->streams[audioStreamIndex];
    //Decoding thread
    std::thread decodeThr(decodeThread1,inputFile);
    //encoding thread
    std::thread encodeThr(encodeThread1,outputFileName, Format,audioStream);
    decodeThr.join();
    encodeThr.join();
    return true;
}

int main() {
    //Input file name and output file name
    std::string inputFilename, outputFilename, Format;
    /*std::cout << "Please enter the input file name (with suffix):";
    std::cin >> inputFilename;
    std::cout << "Please enter the output format (avi, mp4, wmv, mkv, flv...):";
    std::cin >> Format;
    std::cout << "Please enter the output file name (with suffix):";
    std::cin >> outputFilename;*/
    clock_t start, end;
    start = clock();
    inputFilename = "cartoonTrim.mp4";
    Format = "avi";
    outputFilename = "Multithreading.avi";
    if (!Format_conver(inputFilename, outputFilename, Format)) {
        std::cout << "Failed to convert!" << std::endl;
        return -1;
    }
    std::cout << "Conversion complete!" << std::endl;
    end = clock();
    cout << "time = " << double(end - start) / CLOCKS_PER_SEC << "s" << endl;
    return 0;
}