[C++ rewriting the bottom layer of Skynet 03] skynet actual combat—insertion and pop-up of global message queue, imitating skynet sending messages, message transmission and message processing between services

Directory of series articles

[C++ Rewriting Skynet Bottom Layer 01] Skynet Practical Combat-sunnet basic framework, create, open, wait for thread exit methods, imitate skynet to write message classes
[C++ Rewrite Skynet Bottom Layer 02] Skynet Practical Combat-Imitating Skynet to write service classes, object management under multi-threading, spin lock mutex program writing, hash table management objects, and summary of program running steps
[C++ Rewriting Skynet Bottom Layer 03] Skynet Practical Combat-Insertion and pop-up of global message queue, imitating Skynet to send messages, message transmission and message processing between services
[C++ Rewriting Skynet Bottom Layer 04] Skynet Practical Combat – Demonstration program PingPong, waiting and waking up of worker threads, improved scheduling function

Article directory

  • Table of Contents of Series Articles
  • Preface
  • 1. Global message queue
  • 2. Insertion and pop-up of global queue
    • 2.1. Pop-up of global queue
    • 2.2 Insertion into global queue
    • 2.3 Flag bit
  • 3. Imitate skynet to send messages
  • 4. Scheduling of work threads
  • Summarize

Foreword

This chapter will write the insertion and pop-up of the global message queue based on the previous ones, so that the worker thread can schedule various services, and then imitate Skynet to send messages and write the scheduling code of the worker thread.

1. Global message queue

To complete message transmission and message processing between services, a global queue is needed to process and schedule each service. The worker thread must know which services have messages that need to be processed, and which services can be temporarily ignored. Therefore, the pending service can be stored in the global queue, and the worker thread will take out the service from the global queue for processing.
The following code is the new variables and methods added to sunnet.h:
globalQueue represents the global queue and uses smart pointers to reference services that hold pending messages. Since multiple threads may read and write the global queue at the same time, the spin lock globalLocal is defined for locking.

//Sunnet.h
#pragma once
#include <vector>
#include "Worker.h"
#include "Service.h"
#include <unordered_map>
using namespace std;
class Worker;

class Sunnet {<!-- -->
public:
    //single case
    static Sunnet* inst;
public:
    //Constructor
    Sunnet();
    //Initialize and start
    void Start();
    //wait to run
    void Wait();

     //Add and delete services
    uint32_t NewService(shared_ptr<string> type);
    void KillService(uint32_t id); //Only called by the service itself

//Add new
     //Send a message
    void Send(uint32_t toId, shared_ptr<BaseMsg> msg);
    //global queue operation
    shared_ptr<Service> PopGlobalQueue();
    void PushGlobalQueue(shared_ptr<Service> srv);
 
private:
    //worker thread
    int WORKER_NUM = 3; //Number of worker threads (configuration)
    vector<Worker*> workers; //worker object
    vector<thread*> workerThreads; //Threads

    //service list
    unordered_map<uint32_t, shared_ptr<Service>> services; //string or service?>?>
    uint32_t maxId = 0; //Maximum ID
    pthread_rwlock_t servicesLock; //read-write lock

//Add new
     //global queue
    queue<shared_ptr<Service>> globalQueue;
    int globalLen = 0; //queue length
    pthread_spinlock_t globalLock; //lock


private:
    //Start the working thread
    void StartWorker();
       //Get service
    shared_ptr<Service> GetService(uint32_t id);

};

Now that the lock is defined, the lock needs to be initialized in the initialization function. Currently, the sunnet class needs to initialize two locks:

//Start the system
void Sunnet::Start() {<!-- -->
    cout << "Hello Sunnet" << endl;
    //Lock
    pthread_rwlock_init( & amp;servicesLock, NULL);
    pthread_spin_init( & amp;globalLock, PTHREAD_PROCESS_PRIVATE);
    //Open Worker
    StartWorker();
}

The worker thread will process a certain number of messages each time, and then decide whether to re-insert it into the global queue based on whether the service is idle (implemented later)

2. Insertion and pop-up of global queue

2.1. Pop-up of global queue

Since multi-threaded operations are involved, the operations of inserting and popping the global queue must be locked. The following code shows the method of popping the queue:

//Sunnet.cpp
//Pop up the global queue
shared_ptr<Service> Sunnet::PopGlobalQueue(){<!-- -->
    shared_ptr<Service> srv = NULL;
    pthread_spin_lock( & amp;globalLock);
    {<!-- -->
        if (!globalQueue.empty()) {<!-- -->
            srv = globalQueue.front();
            globalQueue.pop();
            globalLen--;
        }
    }
    pthread_spin_unlock( & amp;globalLock);
    return srv;
}

2.2 Insertion into global queue

Very similar to the popup operation:

//Sunnet.cpp
//Insert into global queue
void Sunnet::PushGlobalQueue(shared_ptr<Service> srv){<!-- -->
    pthread_spin_lock( & amp;globalLock);
    {<!-- -->
        globalQueue.push(srv);
        globalLen + + ;
    }
    pthread_spin_unlock( & amp;globalLock);
}

2.3 flag

According to the current design, the worker thread can easily obtain the “pending service”, but if you want to know whether a service is in the global queue, you can only traverse it, which is very slow. Therefore, add a member variable inGlobal to the service class to indicate whether the service is in the queue.
Since inGlobal may also be accessed by multiple threads at the same time, it is also necessary to define a spin lock inGlobalLock to lock inGlobal:

//Service.h
#pragma once
#include <queue>
#include <thread>
#include "Msg.h"

using namespace std;

class Service {<!-- -->
public:
    //Put it in public for efficiency and flexibility

    //unique id
    uint32_t id;
    //type
    shared_ptr<string> type;
    // Whether exiting
    bool isExiting = false;
    //Message list and lock
    queue<shared_ptr<BaseMsg>> msgQueue;
    pthread_spinlock_t queueLock;

    //Add new
    //Whether the mark is in the global queue true: in the queue, or being processed
    bool inGlobal = false;
    pthread_spinlock_t inGlobalLock;
public:
    //Constructor and destructor
    Service();
    ~Service();
    //Callback function (write service logic)
    void OnInit();
    void OnMsg(shared_ptr<BaseMsg> msg);
    void OnExit();
    //Insert message
    void PushMsg(shared_ptr<BaseMsg> msg);
    //Execute message
    bool ProcessMsg();
    void ProcessMsgs(int max);
    //global queue
    void SetInGlobal(bool isIn);
private:
    //Get a message
    shared_ptr<BaseMsg> PopMsg();
};

Don’t forget to initialize and destroy the lock:

//Service.cpp
//Constructor
Service::Service() {<!-- -->
    //Initialize lock
    pthread_spin_init( & amp;queueLock, PTHREAD_PROCESS_PRIVATE);//See the difference in parameters and how to use Skynet
    //Add new
    pthread_spin_init( & amp;inGlobalLock, PTHREAD_PROCESS_PRIVATE);
}

//destructor
Service::~Service(){<!-- -->
    pthread_spin_destroy( & amp;queueLock);
        //Add new
    pthread_spin_destroy( & amp;inGlobalLock);
}

Next, implement the method. The method is very simple, which is to put inGlobal in the lock:

//Service.cpp
void Service::SetInGlobal(bool isIn) {<!-- -->
    pthread_spin_lock( & amp;inGlobalLock);
    {<!-- -->
        inGlobal = isIn;
    }
    pthread_spin_unlock( & amp;inGlobalLock);
}

3. Imitate skynet to send messages

The whole process of sending messages between services: Service 1 sends a message to Service 2, and inserts the message into the message queue of Service 2. If Service 2 is not in the global queue, insert it into the global queue so that the worker thread can process it.

The timing flow of the send method is as follows:

① GetService finds the target service. Read-write lock is used in GetService, but it is very fast.

② Call the PushMsg of the target service to insert data into the message queue. During this period, the message queue will also be locked with a spin lock.

③ Since task services may use the send method, the program first locks the inGlobal of the service to determine whether the service is already in the global queue. If not, call PushGlobalQueue to add it to the global queue and set the value of inGlobal.

//Sunnet.cpp
//Send a message
void Sunnet::Send(uint32_t toId, shared_ptr<BaseMsg> msg){<!-- -->
    shared_ptr<Service> toSrv = GetService(toId);
    if(!toSrv){<!-- -->
        cout << "Send fail, toSrv not exist toId:" << toId << endl;
        return;
    }
    toSrv->PushMsg(msg);
    //Check and put into global queue
    //Flexible control to reduce critical area and destroy encapsulation
    bool hasPush = false;
    pthread_spin_lock( & amp;toSrv->inGlobalLock);
    {<!-- -->
        if(!toSrv->inGlobal) {<!-- -->
            PushGlobalQueue(toSrv);
            toSrv->inGlobal = true;
            hasPush = true;
        }
    }
    pthread_spin_unlock( & amp;toSrv->inGlobalLock);
}

The critical code is very clever. First lock inGlobal, and then lock globalQueue (the lock in the PushGlobalQueue method).

4. Scheduling of worker threads

Only the last step is to implement the scheduling of worker threads, let them read the services in the global queue and process service messages
The following code overrides the thread function of the Worker thread:

//Worker.cpp
#include "Service.h"
//thread function
void Worker::operator()() {<!-- -->
    while(true) {<!-- -->
        shared_ptr<Service> srv = Sunnet::inst->PopGlobalQueue();
        if(!srv){<!-- -->
             usleep(100); //0.1s
            // Sunnet::inst->WorkerWait();
        }
        else{<!-- -->
            srv->ProcessMsgs(eachNum);
            CheckAndPutGlobal(srv);
        }
    //cout << "working id:" <<id <<endl;
   
    }
}

①In the code, first obtain a service from the sunnet global queue and call the ProcessMsgs method to process eachNum messages.
②After processing, call CheckAndPutGlobal (implemented later), which will determine whether there are any unprocessed messages in the service. If so, put them back in the global queue and wait for the next processing.
③If the queue is empty, the thread will wait for 100 seconds and then enter the next loop.

When starting the worker thread, eachNum of different threads will be assigned several powers of the ID (0, 1, 2, 4, 8), etc.
The purpose is: Because locks are used to take out services from the global queue and put them back into the global queue, there will also be a certain amount of time overhead. worker[0] means that there will be a loss for each item processed, worker[ 1] means that there will be a loss for every two messages processed, and worker[2] means that there will be a loss for every four messages processed, so worker[2].
But it is not that the bigger eachNum is, the better. If eachNum is too large, the worker thread will always be processing certain services. Those services waiting in the global queue will not be processed in time, and there will be a higher delay.

 worker->eachNum = 2 << i;

The following is the implementation of CheckAndPutGlobal:
After the worker thread processes the message, put the service back into the global queue if necessary to determine whether the service has any unprocessed messages. If so, put it back into the global queue.

First declare in the .h file:

//Worker.h
#pragma once
#include <thread>
#include "Sunnet.h"
#include "Service.h"
classSunnet;
using namespace std;
class Worker {<!-- -->
public:
    int id; //number
    int eachNum; //How many messages are processed each time
    void operator()(); //Thread function
private:
    //auxiliary function
    void CheckAndPutGlobal(shared_ptr<Service> srv);
};
//Worker.cpp
//Those who adjust Sunnet can be solved by passing parameters.
//The status is not in the queue, global=true
void Worker::CheckAndPutGlobal(shared_ptr<Service> srv) {<!-- -->
    //Exiting (you can only adjust the exit yourself, isExiting will not cause thread conflicts)
    if(srv->isExiting){<!-- -->
        return;
    }

    pthread_spin_lock( & amp;srv->queueLock);
    {<!-- -->
        //Replace it in the global queue
        if(!srv->msgQueue.empty()) {<!-- -->
            //At this time srv->inGlobal must be true
            Sunnet::inst->PushGlobalQueue(srv);
        }
        //Not in the queue, reset inGlobal
        else {<!-- -->
            srv->SetInGlobal(false);
        }
    }
    pthread_spin_unlock( & amp;srv->queueLock);
}

Since Worker and Sunnet refer to each other, a forward declaration needs to be added to Sunnet.h:

class Worker;

Two points need to be noted:
① If the service is in the exiting state (isExiting is True), the worker thread will not put the service into the global queue, the smart pointer count is 0, and the system will destroy it
② Although inGlobal is called whether the service is in the global queue, it also includes two meanings: the service is in the global queue and the message is being processed. Therefore, during the process of returning to the global queue, if there are still unprocessed messages, the inGlobal of the service must be True and there is no need to set it again.

Summary

Based on the previous work, this chapter writes the insertion and pop-up of the global message queue so that the worker thread can schedule various services, and then imitates Skynet to send messages and writes the scheduling code of the worker thread.