[Inter-process communication] Pipe communication {The purpose of inter-process communication; Common methods of inter-process communication; Anonymous pipe: Implementation principle, pipe symbol |, system call pipe, process pool; Named pipe: mkfifo instruction, system call mkfifo}

1. The purpose of inter-process communication

Most of the code we wrote before was a single-process task. A single process cannot use the concurrency function of the process, nor can it realize the collaborative work of multiple processes.

Therefore, we need to implement multi-process tasks, and the key to multi-process is inter-process communication. Inter-process communication includes:

  • Data transfer: One process needs to send its data to another process
  • Resource sharing: sharing the same resources between multiple processes.
  • Notification event: A process needs to send a message to another process or a group of processes to notify it (them) that a certain event has occurred (such as notifying the parent process when the process terminates).
  • Process control: Some processes hope to completely control the execution of another process (such as the Debug process). At this time, the control process hopes to intercept all traps and exceptions of another process and be able to know its status changes in time.

The essence of inter-process communication is to allow different processes to share the same resource, that is, read and write the same memory space. This memory space cannot belong to any process, but must be provided by the operating system, otherwise it will violate the independence of the process.

2. Common methods of inter-process communication

  1. Pipes: The oldest form of inter-process communication in Linux, natively supported by Linux file systems.

    • anonymous pipe

    • named pipe

  2. System V IPC: more focused on local inter-process communication

    • System V message queue

    • System V shared memory

    • System V semaphore

  3. POSIX IPC: Network-based inter-process communication: capable of both local communication and network communication. Make the code highly scalable and usable.

    • message queue

    • Shared memory

    • signal

    • mutex

    • condition variable

    • read-write lock

What is a pipeline?

  • Pipes are the oldest form of inter-process communication in Linux and are natively supported by the Linux file system (EXT).
  • We call the flow of data from one process to another a “pipeline”.
  • A pipe can only transfer data in one direction.
  • Pipes are an inter-process communication method based on the Linux file system. A pipe is actually a file.

3. Anonymous pipe

3.1 Implementation Principle

  1. The parent process creates a pipe: open the same pipe file in read and write mode (open twice).

  2. Call fork() to create a child process.
    When creating a child process, the operating system will copy the file descriptor table of the parent process (copy-on-write) to the child process. This means that the child process will inherit all open file descriptors of the parent process, including files, pipes, sockets, etc..

    Note: Process management and file management are two independent modules. When creating a child process, the process-related kernel data structures (task_struct, mapped page table, mm_struct, files_struct) are basically copied on write. The file-related kernel data structure (file structure) will not be copied.

  3. Build a one-way communication channel: the parent and child processes each close unnecessary reading and writing ends; the sender closes the reading end, and the receiver closes the writing end.

During the process of process communication, does the data in the pipe file need to be written to the disk (disk download, persistence)?

  • No, the pipe is not a disk file, and the data being written to the disk has no meaning for both communicating parties; and the writing speed of the disk is very slow, which will seriously slow down the communication speed.
  • A pipe is a memory-level file and has no disk file entity. Inter-process communication takes place in memory, specifically in the kernel buffer of the pipe file.

Summarize:

  • A pipe is a memory-level file, which is kernel data provided by the operating system. It does not belong to any process, ensuring the independence of the process.
  • By inheriting the file descriptor of the parent process, the parent and child processes share the same resources and realize inter-process communication. Therefore, pipes are often used for communication between processes that have a common ancestor (have a kinship relationship), including father-son, brother, or even grandparent-grandson processes.

3.2 Creation and opening methods

Method 1: Use anonymous pipes to build inter-process communication through commands. The command symbol of anonymous pipes is “|”

Method 2: Anonymous pipes can also be created by processes. The system call to create anonymous pipes is pipe.

  • Prototype: int pipe(int fd[2]);
  • Function: Create and open an anonymous pipe (unnamed file) in both reading and writing modes.
  • Header file:
  • Parameters: fd is an output parameter, representing the file descriptor array of the read and write ends, where fd[0] indicates the read end and fd[1] indicates the write end.
  • Return value: Returns 0 on success, -1 on failure and sets errno.

Use the system call pipe for parent-child inter-process communication:

Test code:

#include .....

int main(){<!-- -->
    //1. Create a pipeline using the pipe system call
    int pipefd[2]={<!-- -->0};
    int ret = pipe(pipefd);
    assert(ret != -1); //In release mode, the assert statement is deleted and ret becomes an unused variable.
    (void)ret; //Forced conversion to void is to eliminate compilation warnings.

#ifdef DEBUG //Conditional compilation: compile during debugging
    //Define macro using compilation directive: g++ -D DEBUG
    //Add # comment definition: g + + #-D DEBUG
    cout << "pipefd[0]: " << pipefd[0] << endl;
    cout << "pipefd[1]: " << pipefd[1] << endl;
#endif

    //2. Create a child process
    pid_t id = fork();
    assert(id != -1);
    if(id == 0)
    {<!-- -->
        //The child process is the communication receiver
        //3. Build a one-way communication channel
        close(pipefd[1]); //Close the write end
        //4.Receive information
        char r_buff[1024]; //Read buffer
        while(true)
        {<!-- -->
            //If the writing party does not close the writing end: there is data in the pipe to read, and if there is no data, it will block and wait.
            ssize_t sz = read(pipefd[0], r_buff, sizeof(r_buff)-1);
            
            if(sz>0)
            {<!-- -->
                r_buff[sz] = '\0';
                printf("I'm child process! pid:%d ppid:%d\
", getpid(), getppid());
                cout << "father# " << r_buff << endl;
            }
            else if(sz == 0) //If the writing party has closed the writing end: read will return 0, indicating that the end of the file has been read!
            {<!-- -->
                cout << "writer quit(father), reader quit(child)!" << endl;
                break;
            }
        }
        close(pipefd[1]); //Close the read end before the child process exits
        exit(0);
    }

    //The parent process is the communication sender
    //3. Build a one-way communication channel
    close(pipefd[0]); //Close the reading end
    //4.Send information
    char w_buff[1024]; //Write buffer
    size_t cnt = 0;
    string message = "I am the parent process and am sending messages to the child process!";
    while(true)
    {<!-- -->
        snprintf(w_buff, sizeof(w_buff), "%s pid:%d cnt:%d", message.c_str(), getpid(), + + cnt);
        write(pipefd[1], w_buff, strlen(w_buff)); //Note: Do not use strlen + 1, do not write the \0 at the end
        if(cnt == 10)
        {<!-- -->
            cout << "writer quit(father)!" << endl;
            break;
        }
        sleep(1);
    }
    close(pipefd[1]); //Close the write end before the parent process exits
    pid_t cpid = waitpid(id, nullptr, 0);
    assert(ret != -1);
    cout << "child process quit! child_pid: " << cpid << endl;
    return 0;
}

operation result:

3.3 Characteristics of anonymous pipes

  1. It can only be used for communication between processes with a common ancestor (kinship); usually, a pipe is created by a process, and then the process calls fork, and then the pipe can be used for communication between parent and child processes.
  2. Generally speaking, the life cycle of a pipe is process-dependent: if all processes that open the pipe exit, the pipe file will be destroyed and released by the operating system.
  3. In order to ensure process communication coordination, pipeline provides access control (pipeline’s own synchronization mechanism):
    • If writing is fast but reading is slow: when the pipe is full, the write call blocks and the writing process is suspended until a process reads the data.
    • If writing is slow, reading is fast: when there is no data in the pipe, the read call blocks, and the reading process pauses until a process writes data.
    • If the write-side fd is closed: read returns 0, indicating that the end of the file has been read.
    • If the read-side fd is closed: the write operation will generate a SIGPIPE signal, which may cause the writing process to exit (exit abnormally).
  4. Pipe communication itself does not provide a mutual exclusion mechanism. If mutual exclusion needs to be achieved in pipeline communication, it can be achieved in combination with other synchronization mechanisms.
  5. Pipeline provides streaming services
  6. The pipeline is half-duplex, and data can only flow in one direction; when communication between two parties is required, two pipelines should be established.

3.4 Build a process pool using anonymous pipes

  • Process Pool is a common process management technology that can improve program concurrency and efficiency.

  • A process pool usually consists of a set of pre-created sub-processes that can be dynamically allocated and managed by the main process. The main process assigns tasks to child processes, and the child processes execute the tasks and return the results to the main process. When the task is completed, the child process will not exit, but will continue to wait for the arrival of the next task.

  • The main advantage of the process pool is that it can avoid frequent creation and destruction of processes, thus improving the efficiency of the program. Since the child process has been created in advance, the overhead of each process creation, such as memory allocation, resource initialization, etc., can be avoided. In addition, the process pool can also avoid the problem of system resource exhaustion caused by too many processes, thus improving the stability of the program.

  • Process pools are usually used to handle a large number of short-term tasks, such as request processing in network servers, data processing, etc. Through the process pool, tasks can be assigned to multiple sub-processes for parallel processing, thereby improving program concurrency and efficiency.

Sample code:

//process_pool.cc process pool implementation code
#include "task.hpp"
#include ...

const size_t PROCESS_NUM = 5; //The number of child processes created

uint32_t WaitCommand(int rfd){<!-- -->
    uint32_t comid = 0; //Strictly limit the data type of comid to 32-bit int to ensure running on different platforms.
    ssize_t sz = read(rfd, & amp;comid, sizeof(comid)); //If the other party does not send the command, block and wait.
    if(sz == 0) //If the writing end fd is closed: read returns 0, and the child process exits.
    {<!-- -->
        return -1;
    }
    assert(sz == sizeof(uint32_t));
    return comid;
}

void SendAndWakeup(pid_t cpid, int wfd, uint32_t cmdid){<!-- -->
    write(wfd, & amp;cmdid, sizeof(cmdid));
    printf("main process: call %d execute %s through %d\
", cpid, desc[cmdid].c_str(), wfd);
}

int main(){<!-- -->
    srand(time(nullptr));
    LoadHandler();
    //1. Create a process channel table: pid-pipefd
    vector<pair<pid_t, int>> slots;

    //2. Create child processes in a loop and establish pipeline communication, filling in the process channel table
    for(size_t i = 0; i < PROCESS_NUM; + + i)
    {<!-- -->
        //2.1 Create pipeline
        int pipefd[2] = {<!-- -->0};
        int ret = pipe(pipefd);
        assert(ret!=-1);
        (void)ret;

        //2.2 Create child process
        pid_t id = fork();
        assert(id!=-1);
        if(id == 0)
        {<!-- -->
            //3. The child process receives and executes the command
            close(pipefd[1]); //The child process closes the write end
            while(true)
            {<!-- -->
                uint32_t comid = WaitCommand(pipefd[0]);
                if(comid >= 0 & amp; & amp; comid < cmdset.size()) //If comid is legal, execute the command
                {<!-- -->
                    cmdset[comid]();
                }
                else if(comid == -1) //The writing end is closed and the child process exits
                {<!-- -->
                   break;
                }
                else
                {<!-- -->
                  cout << "Illegal command ID!" << endl;
                }
            }
            close(pipefd[1]); //Close the read end before the child process exits
            exit(0);
        }

        //2.3 Fill in the process channel table
        close(pipefd[0]); //The parent process closes the reading end
        slots.emplace_back(id, pipefd[1]);
    }

    //The parent process selects the process to send the command
    while(true)
    {<!-- -->
        //random task
        //Randomly select a task
        uint32_t cmdid = rand() % cmdset.size();
        //Select a process, using random numbers, select the process to complete the task, load balancing using random numbers
        int choice = rand() % slots.size();
        //Dispatch the task to the specified process
        SendAndWakeup(slots[choice].first, slots[choice].second, cmdid);
        sleep(1);
        
        //Manually assign tasks
        // int select;
        // uint32_t cmdid;
        // cout << "########################################## " << endl;
        // cout << "# 1. show funcitons 2.send command #" << endl;
        // cout << "########################################## " << endl;
        // cout << "Please Select> ";
        // cin >> select;
        // if (select == 1)
        // ShowHandler();
        // else if (select == 2)
        // {<!-- -->
        // cout << "Enter Your Command> ";
        // // Select task
        // cin >> cmdid;
        // // Select process
        // int choice = rand() % slots.size();
        // // Assign the task to the specified process
        // SendAndWakeup(slots[choice].first, slots[choice].second, cmdid);
        // }
        //else
        // {<!-- -->
        // break;
        // }
    }
    
    //Close the write ends of all pipes before the parent process exits, and the corresponding child processes will exit.
    for(const auto & amp; slot : slots)
    {<!-- -->
        close(slot.second);
    }
    
//Recycle all child process information
    for(const auto & amp; slot : slots)
    {<!-- -->
        pid_t cpid = waitpid(slot.first, nullptr, 0);
        assert(cpid != -1);
        cout << "child process[" << cpid << "] exit!" << endl;
    }
    return 0;
}

//task.hpp implementation code of task processing function
#include ...
typedef function<void()> func;

vector<func> cmdset; cmdset; //Command set: use it to call the specified function
unordered_map<uint32_t, string> desc; //Command description: Use it to print the string description of the specified function

void ReadMySQL(){<!-- -->
    printf("sub process[%d]: Execute database access task\
\
", getpid());
}

void AnalyzeURL(){<!-- -->
    printf("sub process[%d]: Perform URL parsing task\
\
", getpid());
}

void Encrypt(){<!-- -->
    printf("sub process[%d]: Perform encryption task\
\
", getpid());
}

void Save(){<!-- -->
    printf("sub process[%d]: Execute data saving task\
\
", getpid());
}

//Load commands into the command set
void LoadHandler(){<!-- -->
    uint32_t cmdid = 0;
    cmdset.push_back(ReadMySQL);
    desc.emplace(cmdid + + , "ReadMySQL: Read database");
    cmdset.push_back(AnalyzeURL);
    desc.emplace(cmdid + + , "AnalyzeURL: URL analysis");
    cmdset.push_back(Encrypt);
    desc.emplace(cmdid + + , "Encrypt: Encryption calculation");
    cmdset.push_back(Save);
    desc.emplace(cmdid + + , "Save: Data Save");
}

//Print all command descriptions
void ShowHandler(){<!-- -->
    for(const auto & amp; iter : desc)
    {<!-- -->
        printf("cmdid: %d cmddesc: %s\
", iter.first, iter.second.c_str());
    }
}

Running results (executing random tasks):

4. Named Pipes

One limitation of the application of anonymous pipes is that they can only be used for inter-process communication with common ancestors (affinities).

If we want to exchange data between unrelated processes, we can use FIFO files to do the job, which are often called named pipes >. A named pipe is a special type of file.

Named pipes are different from anonymous pipes:

  • An anonymous pipe is a pure memory-level file. There is no disk file image, so there is no concept of file name, path, etc.
  • Named pipes have disk file images, and each named pipe has a file name and a unique absolute path.
  • Therefore, unrelated processes can open the same pipe file for data exchange through the path and file name of the named pipe.

Named pipes are different from ordinary disk files:

  • A named pipe is a functional file specifically used for inter-process communication and cannot write data to disk. Therefore, the size of a named pipe is always 0.
  • Whether anonymous or named pipes, inter-process communication occurs in memory, specifically in the kernel buffer of the pipe file.

Tip: Named pipes and anonymous pipes have different file existence forms and different creation and opening methods. But their ideas and principles for building inter-process communication are the same.

4.1 Creation and opening methods

Method 1: Named pipes can be created from the command line. Use the command mkfifo to create a named pipe:

Method 2: Named pipes can also be created by processes. The system call to create named pipes is mkfifo:

Note: The above two methods can only be used to create FIFO files. You need to call the open function to open the FIFO file for reading or writing.

Opening rules for named pipes:

  • If the current process opens the FIFO for reading, it blocks and waits (blocks in the open function) until a corresponding process opens the FIFO for writing.
  • If the current process opens the FIFO for writing, it blocks and waits (blocks in the open function) until a corresponding process opens the FIFO for reading.
  • Both the read and write ends of the named pipe must be open for read and write operations.

4.2 Use the mkfifo command to create a named pipe

  1. Use the mkfifo command to create a named pipe. The first character of the file attribute identifies the file type: p – pipe file

  2. Only the writing end process of the named pipe was opened, and it was found that the writing end process was blocked and could not write.

  3. The writing operation will not be performed until the reading process of the named pipe is opened, and then the reading process successfully reads the data.

    Tip: If you only open the reader process for reading, it will also be blocked. (Opening rules for named pipes)

  4. Write a script to continuously write data to the pipe and read it

4.3 Implement server & client communication using named pipes

common.h & log.hpp:

//common.h: Common header files of server.cxx and client.cxx
#ifndef _COMMON_H_
#define _COMMON_H_

#include ...
using namespace std;

#define MODE 0666 //File permission code
#define BUFF_SIZE 128 //The size of the buffer

//server and client open the same named pipe
const char* ipcPath = "./fifo.ipc"; //The path of the named pipe

#endif

//log.hpp: code for printing log information
#ifndef _LOG_HPP_
#define _LOG_HPP_

#include ...
using namespace std;

enum MsgTypeID{<!-- -->
    DEBUG,
    NOTICE,
    WORNING,
    ERROR
};

const char* MsgTypeName[] = {<!-- -->"DEBUG", "NOTICE", "WORNING", "ERROR"};

void PrintLog(char* msg, MsgTypeID id){<!-- -->
    printf("%u | %s: %s\
", (unsigned)time(nullptr), MsgTypeName[id], msg);
}

#endif

server.cxx

//server.cxx
#include "common.h"
#include "log.hpp"

void GetMessage(int fd)
{<!-- -->
    //Inter-process communication code in test one
    //...
}

int main()
{<!-- -->
    //1. Create a named pipe
    int ret = mkfifo(ipcPath, MODE);
    if(ret == -1) return errno;
    PrintLog("Created named pipe successfully!", DEBUG);

    //2.Open the pipeline file
    int fd = open(ipcPath, O_RDONLY);
    if(ret == -1) return errno;
    PrintLog("Opening pipeline file successfully!", DEBUG);

    //3. Inter-process communication
    //Test 1: The server directly processes client information
    char rbuff[BUFF_SIZE];
    while(true)
    {<!-- -->
        ssize_t sz = read(fd, rbuff, sizeof(rbuff)-1);
        if(sz>0)
        {<!-- -->
            //Read data
            rbuff[sz] = 0;
            printf("[%d] clinet say> %s\
", getpid(), rbuff);

        }
        else if(sz==0)
        {<!-- -->
            //end of file
            printf("[%d] read end of file, clien quit, server quit too!\
", getpid());
            break;
        }
        else
        {<!-- -->
            //read error
            exit(errno);
        }
    }
    //Test 2: The server creates multiple sub-processes to receive and process client information
    // int process_num = 3;
    // for(int i = 0; i < process_num; + + i)
    // {<!-- -->
    // pid_t id = fork();
    // if(id == -1) exit(errno);
    // if(id == 0)
    // {<!-- -->
    // GetMessage(fd);
    //exit(0);
    // }
    // }
// //Wait for the child process to exit
    // for(int i = 0; i < process_num; + + i)
    // {<!-- -->
    // pid_t cpid = waitpid(-1, nullptr, 0);
    // }
    
    //4. Close the file
    close(fd);
    PrintLog("Close pipe file!", DEBUG);

    //5. Delete the named pipe
    unlink(ipcPath);
    PrintLog("Delete pipeline file!", DEBUG);

    return 0;
}

client.cxx

//client.cxx
#include "common.h"

int main()
{<!-- -->
    //1.Open the pipeline file
    int fd = open(ipcPath, O_WRONLY);
    if(fd == -1) return errno;
    
    //2. Inter-process communication
    //char wbuff[BUFF_SIZE];
    string wbuff(BUFF_SIZE, 0);
    while(true)
    {<!-- -->
        cout << "please enter message line:" << endl;
        getline(cin, wbuff);
        write(fd, wbuff.c_str(), wbuff.size());
    }

    //3. Close the file
    close(fd);
    return 0;
}

Test 1: The server directly processes client information

  1. Only the server side (reading side) was opened and the process was found to be blocked when opening the pipe file.

  2. When the client side (write side) is opened, the server side opens the pipe file successfully. (Opening rules for named pipes)

  3. The server side can successfully receive the information sent by the client side, realizing the local version of server & client communication!

  4. When the client (writing side) exits, the server side (reading side) reads to the end of the pipe file, read returns 0, and the server side also exits. (Access control)

    Tip: Like anonymous pipes, named pipes also provide access control in order to ensure process communication coordination. Please refer to anonymous pipes for specific rules.

Test 2: The server creates multiple sub-processes to receive and process client information. (All child processes inherit the pipe files opened by the main process)

Messages sent from the client are preemptively received by the server process (you can find that the pid of the child process output each time is random)

Tip: A pipeline can have multiple readers at the same time, and all readers of the pipeline compete to obtain resources.