[Server] fiber coroutine module

fiber coroutine module

The following is a review of what I learned from the sylar server;
Reference materials

  • Sylar’s fiber coroutine module implements asymmetric coroutine based on ucontext_t

Functions have only two behaviors: calling and returning. Once the function returns, the state it holds on the stack will be destroyed. Coroutines have two more actions than functions: suspend and resume. When a coroutine actively suspends, its control will be transferred to another coroutine. At this time, the state it owns is still retained. After another coroutine obtains control, it may also choose to do so at some point in the future. Suspend, so that control of the original coroutine is restored. Once the coroutine returns like a function, the state it owns will be destroyed.

Coroutines are functions that can suspend execution and resume later.

In the same single-threaded environment, the yield and resume of the coroutine must be performed synchronously. The yield of one coroutine must correspond to the resume of another coroutine, because a thread cannot have an execution body. Moreover, the yield and resume of the coroutine are completely controlled by the application. Different from threads, after the thread is created, the running and scheduling of the thread are also automatically completed by the operating system. However, after the coroutine is created, the running and scheduling of the coroutine must be completed by the application program, just like calling a function, so the coroutine Also known as user mode thread.

The so-called creating a coroutine is actually to wrap a function into a coroutine object, and then use the coroutine to run the function; the so-called coroutine scheduling is actually to create a batch of coroutine objects, and then create a schedule Coroutines digest these coroutine objects one by one by scheduling coroutines (coroutines can continue to add new scheduling tasks to the scheduler while being scheduled); the so-called IO coroutine scheduling actually means that when scheduling coroutines, if If you find that this coroutine is waiting for IO to be ready, let this coroutine give up execution rights first, and then resume the operation of this coroutine after the corresponding IO is ready; the so-called timer is to preset a coroutine for the scheduling coroutine Object, and the default coroutine object will be restored when the timer expires.

ucontext_t interface

//Context structure definition
typedef struct ucontext_t {<!-- -->
    // After the current context ends, the pointer to the next activated context object is only valid when the current context is created by makecontext.
    struct ucontext_t *uc_link;
    //Signal blocking mask for the current context
    sigset_t uc_sigmask;
    // The stack memory space used by the current context is only valid when the current context is created by makecontext.
    stack_t uc_stack;
    //Platform-related context specific content, including register values
    mcontext_t uc_mcontext;
    ...
}ucontext_t;
 
// Get the current context
int getcontext(ucontext_t *ucp);
 
//Restore the context pointed to by ucp. This function will not return, but will jump to the function corresponding to the ucp context for execution, which is equivalent to calling the function in disguise.
int setcontext(const ucontext_t *ucp);
 
// Modify the context pointer ucp obtained by getcontext, bind it to a function func, and support specifying func runtime parameters.
// Before calling makecontext, you must manually allocate a memory space to ucp and store it in ucp->uc_stack. This memory space will be used as the stack space when the func function is running.
// At the same time, you can also specify ucp->uc_link, which means that the context pointed to by uc_link will be restored after the function is executed.
// If uc_link is not assigned, setcontext or swapcontext must be called at the end of the func function to re-specify a valid context, otherwise the program will run away.
// After makecontext is executed, ucp is bound to the function func. When setcontext or swapcontext is called to activate ucp, func will be run.
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
 
//Restore the context pointed to by ucp and store the current context into oucp.
// Like setcontext, swapcontext will not return, but will jump to the function corresponding to the ucp context for execution, which is equivalent to calling the function
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);

Sylar coroutine module design

Sylar uses an asymmetric coroutine model, that is, a sub-coroutine can only switch with the main coroutine of the thread, but cannot switch with another sub-coroutine. At the end of the program, you must switch back to the main coroutine to ensure that the program can end normally

Sylar designs 6 states for each coroutine.

/**
 * @brief coroutine status
 */
enum State {<!-- -->
    /// Initialization state
    INIT,
    /// Pause state
    HOLD,
    /// Executing status
    EXEC,
    /// End status
    TERM,
    /// Executable state
    READY,
    ///Exception status
    EXCEPT
};

The fiber class contains the following member variables

/// coroutine id
uint64_t m_id = 0;
///Coroutine stack size
uint32_t m_stacksize = 0;
/// Coroutine status
State m_state = READY;
/// Coroutine context
ucontext_t m_ctx;
///Coroutine stack address
void *m_stack = nullptr;
/// Coroutine entry function
std::function<void()> m_cb;

Two global static variables are defined, used to generate coroutine IDs and count the current number of coroutines.

/// Global static variable, used to generate coroutine id
static std::atomic<uint64_t> s_fiber_id{<!-- -->0};
/// Global static variable, used to count the current number of coroutines
static std::atomic<uint64_t> s_fiber_count{<!-- -->0};

The following two thread local variables are used to save coroutine context information

///Thread local variables, the coroutine that the current thread is running
static thread_local Fiber *t_fiber = nullptr;
/// Thread local variables, the main coroutine of the current thread, switching to this coroutine is equivalent to switching to the main thread to run, in the form of smart pointers
static thread_local Fiber::ptr t_thread_fiber = nullptr;

Constructor
The constructor with parameters is used to construct the sub-coroutine, initialize the ucontext_t context and stack space of the sub-coroutine, and requires that the entry function of the coroutine and the optional coroutine stack size must be passed in. The constructor without parameters is used to initialize the coroutine function of the current thread and construct the thread’s main coroutine object.

/**
 * @brief constructor
 * @attention The parameterless constructor is only used to create the first coroutine of the thread, which is the coroutine corresponding to the thread's main function.
 * This coroutine can only be called by the GetThis() method, so it is defined as a private method
 */
Fiber::Fiber(){<!-- -->
    SetThis(this);
    m_state = RUNNING;
 
    if (getcontext( & amp;m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "getcontext");
    }
 
     + + s_fiber_count;
    m_id = s_fiber_id + + ; // The coroutine ID starts from 0 and increases by 1 after use.
 
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() main id = " << m_id;
}
 
/**
 * @brief constructor, used to create user coroutines
 * @param[] cb coroutine entry function
 * @param[] stacksize stack size, default is 128k
 */
Fiber::Fiber(std::function<void()> cb, size_t stacksize)
    : m_id(s_fiber_id + + )
    , m_cb(cb) {<!-- -->
     + + s_fiber_count;
    m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
    m_stack = StackAllocator::Alloc(m_stacksize);
 
    if (getcontext( & amp;m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "getcontext");
    }
 
    m_ctx.uc_link = nullptr;
    m_ctx.uc_stack.ss_sp = m_stack;
    m_ctx.uc_stack.ss_size = m_stacksize;
 
    makecontext( & amp;m_ctx, & amp;Fiber::MainFunc, 0);
 
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() id = " << m_id;
}
 
/**
 * @brief returns the coroutine being executed by the current thread
 * @details If the current thread has not created a coroutine, create the first coroutine of the thread,
 * And this coroutine is the main coroutine of the current thread, and other coroutines are scheduled through this coroutine. That is to say, other coroutines
 * At the end, you must switch back to the main coroutine, and the main coroutine will select a new coroutine to resume.
 * @attention If a thread wants to create a coroutine, it should first perform the Fiber::GetThis() operation to initialize the main function coroutine.
 */
Fiber::ptr GetThis(){<!-- -->
    if (t_fiber) {<!-- -->
        return t_fiber->shared_from_this();
    }
 
    Fiber::ptr main_fiber(new Fiber);
    SYLAR_ASSERT(t_fiber == main_fiber.get());
    t_thread_fiber = main_fiber;
    return t_fiber->shared_from_this();
}

Implementation of coroutine primitives, suspension and recovery

/**
 * @brief Switch the current coroutine to running state (with scheduling coroutine)
 * @pre getState() != EXEC
 * @post getState() = EXEC
 */
void Fiber::swapIn() {<!-- -->
    SetThis(this);
    SYLAR_ASSERT(m_state != EXEC);
    m_state = EXEC;
    if(swapcontext( & amp;Scheduler::GetMainFiber()->m_ctx, & amp;m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "swapcontext");
    }
}

/**
 * @brief Switch the current coroutine to the background (with scheduling coroutine)
 */
void Fiber::swapOut() {<!-- -->
    SetThis(Scheduler::GetMainFiber());
    if(swapcontext( & amp;m_ctx, & amp;Scheduler::GetMainFiber()->m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "swapcontext");
    }
}

/**
 * @brief Switch the current thread to execution state (with the main coroutine)
 * @pre is executed as the main coroutine of the current thread
 */
void Fiber::call() {<!-- -->
    SetThis(this);
    m_state = EXEC;
    SYLAR_LOG_ERROR(g_logger) << getId();
    if(swapcontext( & amp;t_threadFiber->m_ctx, & amp;m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "swapcontext");
    }
}

/**
 * @brief Switch the current thread to the background (with the main coroutine)
 * @pre The coroutine is executed
 * @post returns to the main coroutine of the thread
 */
void Fiber::back() {<!-- -->
    SetThis(t_threadFiber.get());
    if(swapcontext( & amp;m_ctx, & amp;t_threadFiber->m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "swapcontext");
    }
}

Coroutine entry function

void Fiber::MainFunc() {<!-- -->
    Fiber::ptr cur = GetThis(); // The shared_from_this() method of GetThis() increments the reference count by 1
    SYLAR_ASSERT(cur);
 
    cur->m_cb(); // The entry function that actually executes the coroutine here
    cur->m_cb = nullptr;
    cur->m_state = TERM;
 
    auto raw_ptr = cur.get(); // Manually reduce the reference count of t_fiber by 1
    cur.reset();
    raw_ptr->yield(); // Automatically yield when the coroutine ends to return to the main coroutine
}

Coroutine reset
Reuse the ended coroutine, reuse its stack space, and create a new coroutine

void Fiber::reset(std::function<void()> cb) {<!-- -->
    SYLAR_ASSERT(m_stack);
    SYLAR_ASSERT(m_state == TERM);
    m_cb = cb;
    if (getcontext( & amp;m_ctx)) {<!-- -->
        SYLAR_ASSERT2(false, "getcontext");
    }
 
    m_ctx.uc_link = nullptr;
    m_ctx.uc_stack.ss_sp = m_stack;
    m_ctx.uc_stack.ss_size = m_stacksize;
 
    makecontext( & amp;m_ctx, & amp;Fiber::MainFunc, 0);
    m_state = READY;
}