The nature of communication, communication methods, the principles and multiple characteristics of anonymous pipes (access control, pipe_buf, atomicity, half-duplex), pipe() + simulation implementation code, communication between multiple processes (anonymous pipes, simulation implementation code )

Table of Contents

communication

introduce

Why should there be communication?

the nature of communication

How to communicate

pipeline

system V

posix standard

Signal

pipeline

introduce

anonymous pipe

Principle

introduce

process

Implement — pipe()

function prototype

parameter

return value

mock code

Features

Used for communication between parent and child processes

Provide access control

When the buffer is full

Write regulations

pipe_buf

atomicity

Provide byte stream-oriented communication services

The life cycle of the pipeline depends on the process

Add exit information to the code

One-way communication — a special case of half-duplex

half duplex

full duplex

Communication between multiple processes

introduce

code

head File

Main program — create child process + clean up resources

Main program — menu interface + parent process dispatching tasks

Results of the


Communication

Introduction

Process communication refers to the mechanism and method for information exchange and data sharing betweendifferent processes

Includes completion:

Why is there communication

  • In the operating system, a process is an independently running program entity, and each process has its own memory space and resources.
  • But sometimes we need to use multiple processes to complete a certain function
  • Therefore we need the concept of process communication

Process communication allows collaboration and data exchange between different processes to achieve a common goal

Therefore, the purpose of communication is to achieve multi-process collaboration

The Essence of Communication

  • Prerequisite for communication: Let different processes see the same piece of “memory” (organized in a specific structure)
  • Only in this way can data be transmitted between them to achieve various functions.
  • And this “memory” does not belong to any process
  • If it belongs to a certain process, how can other processes get it? (The process is independent)
  • Therefore, it is provided by a module in os

How to communicate

Pipeline

  • File system based concepts
  • Divided into anonymous pipes and named pipes, both are essentially files

system V

  • The System V Unix operating system provides a set of inter-process communication mechanisms – System V IPC (Inter-Process Communication), which is used to achieve data transfer and synchronization between different processes.
  • System V IPC provides three main communication mechanisms – message queues, semaphores, and shared memory

posix standard

The POSIX standard contains specifications on multi-threading and network communication, allowing developers to use a unified interface for multi-thread programming and network communication on systems that comply with the POSIX standard.

signal

A signal is an asynchronous communication method used to notify a process that a certain event has occurred

Pipeline

Introduction

  • Real-life pipelines (natural gas pipelines, oil pipelines, etc.) are all built to transport certain resources, and have an outlet and an entrance for one-way transportation.
  • In the computer field, resources refer to data
  • In order to achieve communication, some designers designed a one-way communication method
  • This method is similar to the pipeline function in reality, so it is named pipeline.

Anonymous Pipe

Principle

Introduction
  • Pipe communication — Communication between processes through pipes
  • And pipes are connected to us using file descriptors
process

When a process A opens a pipe in a certain way, it actually creates two files

One file serves as the read end of the pipe, and the other file serves as the write end of the pipe

These two file descriptors are connected to a buffer in the kernel, which is actually a memory area

How to let another process B also see this pipe?

  • See the pipe, that is, open the file
  • As shown in the figure, as long as we get the file structure in process A, we can operate the pipeline through the fd of the pipeline-related file.
  • In other words, as long as we can get the pcb of the process, we can see the pipe
  • However, the process is independent, how to get the pcb?
  • Don’t forget that child process can break through independence because it can inherit the pcb and file descriptor table of the parent process (that is, make a copy):
  • In this way, the child process can communicate with the parent process (because it sees the same pipe)

Implementation — pipe()

Function prototype
#include <unistd.h>

int pipe(int pipefd[2]);
  • Used to create an anonymous pipe
parameter
  • The parameter pipefd is an integer array containing two file descriptors
  • pipefd[0] is the read-side fd, pipefd[1] is the write-side fd
return value
  • Returns 0, indicating that the pipeline is created successfully
  • Returns -1, indicating an error occurred

simulation code

#include <iostream>
#include <unistd.h>
#include <cstring>
#include <assert.h>
#include <cstdio>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>

using namespace std;

int main()
{
    int pipefd[2] = {0}; // Two files will be opened (for reading and writing)
    int ret = pipe(pipefd);
    assert(ret != -1);
    (void)ret; // assert will not be displayed under release. If you do not use this ret, there will be a warning.

#ifdef DEBUG
    cout << pipefd[0] << endl;
    cout << pipefd[1] << endl;
#endif

    pid_t fd = fork(); // Create a child process and let it communicate with the parent process
    assert(fd!=-1);
    if(fd==0)
    {
        // Child process -- read
        close(pipefd[1]);//Turn off the write end
        char buffer[1024];//used to store data
        memset(buffer, 0, sizeof(buffer));
        while (true){
            ssize_t size = read(pipefd[0], buffer, sizeof(buffer) - 1); //In order to make room for \0
            if (size > 0)
            {
                buffer[size] = 0; //After reading the content, remember to add \0
                cout << "im child "
                     << "message: " << buffer << endl; //Print out the content read
            }
            else if (size == 0) //If it is 0, it means the end of the file has been read
            {
                cout << "read end" << endl;
                break;
            }
        }
        exit(0);
    }

    // Parent process -- write
    close(pipefd[0]);//Turn off the reading end
    string message = "im parent, im writing";
    char buffer[1024]; //Storage data
    int count = 0;
    memset(buffer, 0, sizeof(buffer));
    while(true)
    {
        snprintf(buffer, sizeof(buffer) - 1, "pid:%d,%s,%d", getpid(), message.c_str(), count + + ); //First format the content to be written
        ssize_t size = write(pipefd[1], buffer, strlen(buffer));
        if (size < 0) //If it is less than 0, an error has occurred
        {
            cout << "write fail" << endl;
        }
        sleep(1); //After successful writing, rest for 1s before continuing writing
    }
    pid_t flag = waitpid(fd,nullptr, 0); //Wait for the child process to exit
    if (flag <= 0)
    {
        cout << "wait fail" << endl;
    }
    assert(flag > 0);
    (void)flag;

    cout << "wait success" << endl;
    return 0;
}

Features

  • The principle of anonymous pipes only applies to communication between parent and child processes
  • Because one of the processes must get the file descriptor table of the other process, only the parent-child process can achieve this.
  • Each process opens two files, as reading end and writing end
  • But pipeline is one-way communication, so each process only needs one end.
  • Therefore, you need to determine the division of labor before communicating, and then close unnecessary files
Provide access control

In the above code, we let the parent process stop for 1 second after each successful write.

You can see that the output information is very orderly:

Description -> The reading end will wait for the writing end to write instead of reading randomly by itself, resulting in reading some junk data or something.

When the buffer is full

What will happen if the parent process keeps writing and writing until the buffer we defined is full?

(Let the child process sleep for 20s and not let it read data)

// Parent process -- write
    close(pipefd[0]);
    string message = "im parent, im writing";
    char buffer[1024];
    int count = 0;
    memset(buffer, 0, sizeof(buffer));
    while(true)
    {
        snprintf(buffer, sizeof(buffer) - 1, "pid:%d,%s,%d", getpid(), message.c_str(), count + + );
        ssize_t size = write(pipefd[1], buffer, strlen(buffer));
        cout << count << endl; //Identifies the number of times it is written
        if (size < 0)
        {
            cout << "write fail" << endl;
        }
        // sleep(1);
    }

You will find that the write count of the parent process is stuck here:

It means that the buffer is full at this time, but it does not continue to write.

Note -> When the writing end is full, it will wait for the reading end to read out and will not keep writing

Write Requirements

pipe_buf

Specifies the kernel’s pipe buffer size

4096 bytes under linux

Atomicity

Atomicity means that an operation is either completely executed or not executed at all, with no intermediate state (used in concurrent programming)

The above two examples illustrate that the pipeline will control access to these two processes

(Without access control, when the parent and child processes write to the monitor at the same time, they will interfere with each other’s printed information)

Provide byte stream-oriented communication services

If the child process stops for a few seconds before each read, a bunch of information will be output at once:

Description -> The number of writes has nothing to do with the number of reads -> Indicates that the data exists in the form of a byte stream, which is a streaming-oriented communication service

The life cycle of the pipe is with the process
  • Because pipes are also files, and the life cycle of files depends on the process
  • Once no process has the file open, the file will be destroyed

  • When the fd on the writing end is closed, the read on the reading end will return 0, indicating that the end of the file has been read.
Add exit information to the code
 if (fd == 0)
    {
        // Child process -- read
        close(pipefd[1]);
        char buffer[1024];
        memset(buffer, 0, sizeof(buffer));
        while(true)
        {
            ssize_t size = read(pipefd[0], buffer, sizeof(buffer) - 1);
            if (size > 0)
            {
                buffer[size] = 0;
                cout << "im child ,"
                     << "message: " << buffer;
            }
            else if (size == 0) //Exit when the data is read
            {
                cout << "read end , im quit" << endl;
                break;
            }
        }
        exit(0);
    }
    // Parent process -- write
    close(pipefd[0]);
    string message = "im parent, im writing";
    char buffer[1024];
    int count = 0;
    memset(buffer, 0, sizeof(buffer));
    while(true)
    {
        snprintf(buffer, sizeof(buffer) - 1, "pid:%d,%s,%d\
", getpid(), message.c_str(), count + + );
        ssize_t size = write(pipefd[1], buffer, strlen(buffer));
        //cout << count << endl;
        if (size < 0)
        {
            cout << "write fail" << endl;
        }
        if(count==5){ //Close the writing end after writing five times
            close(pipefd[1]);
            cout<<"parent quit success"<<endl;
            break;
        }
        sleep(1);
    }
    pid_t flag = waitpid(fd, nullptr, 0); //Wait for the child process to exit
    if (flag <= 0)
    {
        cout << "wait fail" << endl;
    }
    assert(flag > 0);
    (void)flag;

    cout << "wait success" << endl;
    return 0;
}

Although it is not clear from this code whether the pipe has been destroyed, you can probably tell by looking at it.

One-way communication — the special case of half-duplex
Half-duplex
  • Half-duplex is a communication method, which means that data can only be transmitted in one direction between the two ends of the communication.
  • And the data transmission directioncan only be switched between sending and receiving, and cannot be sent and received at the same time.
  • In half-duplex communication, the communicating partiestransmit data alternately
  • When one party sends data, the other party must be in the receiving state to receive the sender’s data
  • Once the sender completes the data transfer, the receiver can switch to the sending state and send data to the sender
full duplex
  • Full-duplex is a communication method that means that both ends of the communication can perform bidirectional data transmission at the same time, that is, data can be sent and received at the same time.
  • In full-duplex communication, both communicating parties can send and receive at the same time without alternating.
  • Both processes have independent sending and receiving channels, which can carry out two-way data transmission at the same time, thus realizing parallel data exchange

Communication between multiple processes

Introduction

If we want to communicate between multiple processes, we can create multiple child processes to complete

The parent process determines which process to send the task to by writing commands in the pipe, and then leaves it to the child process to complete.

code
Header file
#include <iostream>

#include <unistd.h>
#include <cstring>
#include <assert.h>
#include <cstdio>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <time.h>

#include <string>
#include <functional>
#include <vector>
#include <unordered_map>

#define num 5

using namespace std;

using func = function<void()>;

vector<func> callbacks; //storage task method
unordered_map<int, string> describe; //Associate the number with the task name

//Four kinds of tasks
void read_mysql()
{
    cout << "process: " << getpid() << "Execute the task of accessing the database" << endl;
}
void execute_url()
{
    cout << "process: " << getpid() << "Perform url parsing" << endl;
}
void cal()
{
    cout << "process: " << getpid() << "Perform encryption task" << endl;
}
void save()
{
    cout << "process: " << getpid() << " Execute the task of accessing data persistence" << endl;
}

void load() //Initialize the task into two containers
{
    describe.insert({callbacks.size(), "read_mysql"});
    callbacks.push_back(read_mysql);
    for (const auto & amp;i : describe)
    {
        cout << i.first << " : " << i.second << endl;
    }

    describe.insert({callbacks.size(), "execule_url"});
    callbacks.push_back(execule_url);

    describe.insert({callbacks.size(), "cal"});
    callbacks.push_back(cal);

    describe.insert({callbacks.size(), "save"});
    callbacks.push_back(save);
}

void show_hander() //Show what tasks are being executed now
{
    //cout<<"im show "<<endl;
    for (const auto & amp;i : describe)
    {
        cout << i.first << " : " << i.second << endl;
    }
}

int task_size(){ //Return the number of tasks
    return callbacks.size();
}

int wait_command(int fd, bool & amp;quit) //Read the tasks dispatched by the parent process and return
{
    uint32_t command = 0;
    ssize_t size = read(fd, & amp;command, sizeof(uint32_t));
    if(size==0)
    {
        quit = true;
        return -1;
    }
    assert(size == sizeof(uint32_t));
    return command;
}

void send_wakeup(pid_t pid, int fd, uint32_t command)//Write the task number to the pipe and let the child process read it
{
    // cout << "cin command: " << command << endl;
    // cout << "describe : " << describe[command] << endl;
    write(fd, & amp;command, sizeof(command));
    cout << "call process : " << pid << ", execute : " << descripe[command] << ", though : " << fd << endl;
}
Main program — Create child process + Clean up resources
int main()
{
    vector<pair<pid_t, int>> mapping_table; //Associate the sub-process and task number of the task execution
    load(); //Initialization

    //Create multiple processes
    for (int i = 0; i < num; i + + )
    {
        int pipefd[2] = {0};
        pipe(pipefd); // Create a pipe
        pid_t id = fork();
        assert(id != -1);
        (void)id;

        if (id == 0)
        {
            // child
            close(pipefd[1]); //Close the write end
            while (true) // Wait for command + execute task
            {
                bool quit = false;
                int command = wait_command(pipefd[0], quit);//wait for command
                if (quit) //Determine whether to exit
                    break;
                if (command >= 0 & amp; & amp; command < task_size())
                {
                    callbacks[command]();//Execute the dispatched tasks
                    cout << "success" << endl;
                }
                else
                {
                    cout << "Illegal command" << endl;
                }
            }
            cout << "process exit : " << getpid() << endl;
            exit(0);
        }

        // parent
        close(pipefd[0]);//Close the reading end
        //Save [through which fd to send the command to the child process]:
        mapping_table.push_back(pair<pid_t, int>(id, pipefd[1]));

    }

    //Dispatch tasks

    // Close fd + process
    for (const auto & amp;i : mapping_table)
    {
        close(i.second); // Close the write end of the communication between each process, so that the child process reads 0 and exits
    }
    for (const auto & amp;i : mapping_table)
    {
        waitpid(i.first, nullptr, 0); // Wait for each exiting child process and recycle resources
    }
    return 0;
}
 // To dispatch tasks, each process needs to be used -- the load balancing of the stand-alone version, so random numbers are used
    srand((unsigned int)time(nullptr) ^ getpid() ^ 1243225324);

    cout << "****************************************" << endl;
    cout << "* 1. show functions 2. send command *" << endl;
    cout << "****************************************" << endl;
    cout << "please select : " << endl;

    while(true)
    {
        int flag = 0;
        int select = 0, command = 0;
        cin >> select;
        if (select == 1)
        {
            show_handler();
        }
        else if (select == 2)
        {
            cout << "please enter your command : " << endl;
            cin >> command; // task
            // cout << "command: " << command << endl;
            cout << endl;
            int i = rand() % mapping_table.size(); //Randomly assign a child process to perform the task
             //After dispatching the task, you need to wake up the corresponding child process
            send_wakeup(mapping_table[i].first, mapping_table[i].second, command);
        }
        else
        {
            cout << "select error" << endl;
        }

        sleep(1);
        while (true) //repeat
        {
            cout << endl
                 << "continue : y/n ?" << endl;
            char c = 0;
            cin >> c;
            getchar();
            if (c == 'y' || c == 'Y')
            {
                cout << "****************************************" << endl;
                cout << "* 1. show functions 2. send command *" << endl;
                cout << "****************************************" << endl;
                cout << "please select : " << endl;
                flag = 1;
                break;
            }
            else if (c == 'n' || c == 'N')
            {
                cout << "exit" << endl;
                flag = 2;
                break;
            }
            else
            {
                cout << "select error, please retry" << endl;
                continue;
            }
            getchar();
        }
        if (flag == 1)
        {
            continue;
        }
        else if (flag == 2)
        {
            break;
        }
    }
Execution Result

This is what it looks like when running, we can manually dispatch tasks

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. CS entry skill treeLinux practical commandsPipeline 37992 people are learning the system