Implementing time wheel algorithm from zero to one based on golang (3)

Introduction

This article is compiled with reference to Mr. Xiao Xu’s related blogs. The project address is:
https://github.com/xiaoxuxiansheng/timewheel/blob/main/redis_time_wheel.go. Mainly to improve the process and record personal study notes.

Distributed version implementation

In this chapter, we discuss how to implement a distributed version of the time wheel based on redis to meet the requirements of the actual production environment for a distributed scheduled task scheduling system.

The implementation idea of the redis version of the time wheel is to use the ordered set (zset for short) in redis to store and manage scheduled tasks.The timestamp corresponding to the execution time of each scheduled task is used as the score in the zset to complete An orderly arrangement and combination of scheduled tasks.

Redis official document link for zset data structure: https://redis.io/docs/data-type, here is a brief look at its use.

ZSET (Ordered Set) ofRedis is one of the Redis data types, which is a collection of string elements and does not allow duplicate members. The difference is that each element is associated with a score of type double. Redis uses scores to sort the members in the set from small to large. The members of ZSET are unique, but the scores can be repeated.

Basic operations include adding elements, deleting elements, modifying the element’s score, querying the element’s score, etc. The following are some commonly used ZSET operation commands:

  • ZADD key score member: Adds an element to ZSET and updates its score if the element already exists.
  • ZSCORE key member: Gets the score of an element in ZSET.
  • ZRANGE key start stop [WITHSCORES]: Returns the elements in the specified interval in ZSET in order from low to high scores. If the WITHSCORES option is used, the score of the element will be included in the result.
  • ZREVRANGE key start stop [WITHSCORES]: Same function as ZRANGE, but elements are returned from high to low by score.
  • ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]: Returns elements in ZSET with scores between min and max.
  • ZREMRANGEBYRANK key start stop: Remove all members of the ZSET that are ranked within a given interval.
  • ZREMRANGEBYSCORE key min max: Remove all members of ZSET whose scores are within the given range.
  • ZINCRBY key increment member: Increase or decrease the score of the specified member in ZSET.
  • ZCARD key: Get the number of members of ZSET.
  • ZCOUNT key min max: Counts the number of members in ZSET with scores between min and max.
  • ZREM key member [member …]: Delete one or more members in ZSET.

docker installation

Get the latest version of the Redis image

docker pull redis:latest

Run container
After the installation is complete, we can use the following command to run the redis container:

$ docker run -itd --name redis-test -p 6379:6379 redis

Then we use the redis service through the redis-cli connection test.

$ docker exec -it redis-test /bin/bash

Usage example:

# Add three elements to the ZSET named myzset
ZADD myzset 1 "one" 2 "two" 3 "three"

# Get all elements in myzset and their scores
ZRANGE myzset 0 -1 WITHSCORES

# Get the number of members with score 2 in myzset
ZCOUNT myzset 2 2

# Increase the score of element "one"
ZINCRBY myzset 10 "one"

Code usage

The code can be found in the warehouse https://github.com/xiaoxuxiansheng/timewheel/blob/main/redis_time_wheel.go. The startup process is output here.

Data structure

redis time wheel

There are two core classes in the redis version of the time wheel. The first one is the class definition of the time wheel:

  • redisClient: The storage of scheduled tasks is implemented based on redis zset, so a redis client needs to be built in. This part will be expanded in section 3.2;
  • httpClient: When a scheduled task is executed, it is implemented by requesting the user to reserve a callback address, so an http client needs to be built in.
  • channel × 2: ticker and stopc correspond to the golang standard library timer and the controller to stop goroutine
//Distributed version of time wheel based on redis
type RTimeWheel struct {<!-- -->
    // Built-in singleton tool to ensure that stopc is only closed once
    sync.Once
    // redis client
    redisClient *redis.Client
    //http client. Needed to be used when executing scheduled tasks.
    httpClient *thttp.Client
    //Controller channel used to stop the time wheel
    stopc chan struct{<!-- -->
    // The timer that triggers the scheduled scan task
    ticker *time.Ticker
}

Scheduled tasks

The type of scheduled task is defined as follows, which includes the unique key of the scheduled task and the http protocol parameters that need to be used when executing the callback of the scheduled task.

//Every scheduled task submitted by the user
type RTaskElement struct {<!-- -->
    // Globally unique key for scheduled tasks
    Key string `json:"key"`
    //When the scheduled task is executed, the callback http url
    CallbackURL string `json:"callback_url"`
    // http method used during callback
    Method string `json:"method"`
    //Request parameters passed during callback
    Req interface{<!-- -->} `json:"req"`
    // http request header used during callback
    Header map[string]string `json:"header"`
}

Constructor

When constructing a time wheel instance, the user needs to inject the redis client and http client.

In the initialization process, ticker is a timer implemented by the golang standard library. The execution interval of the timer is fixed at 1 s. In addition, the run method will be run asynchronously to start a resident goroutine, and the life cycle will be controlled through the stopc channel.

func NewRTimeWheel(redisClient *redis.Client, httpClient *thttp.Client) *RTimeWheel {<!-- -->
r := RTimeWheel{<!-- -->
ticker: time.NewTicker(time.Second),
redisClient: redisClient,
httpClient: httpClient,
stopc: make(chan struct{<!-- -->}),
}
go r.run()
return&r
}

Start and stop

The time wheel resident goroutine running process is also run in the form of for + select:

  • When the stopc signal is received, the goroutine exits and the time wheel stops running.
  • When receiving the ticker signal, start an asynchronous goroutine to execute the current batch of scheduled tasks.
//Run time wheel
func (r *RTimeWheel) run() {<!-- -->
    // Running a resident goroutine through the code structure of for + select is a normal operation
    for {<!-- -->
        select {<!-- -->
        // Exit the goroutine when receiving the termination signal
        case <-r.stopc:
            return
        // Each time a signal from the timer is received, batch scanning and execution of scheduled tasks are performed
        case <-r.ticker.C:
            // Get tasks every tick
            go r.executeTasks()
        }
    }
}

The Stop method of stopping the time wheel ensures that the resident goroutine can exit in time by closing stopc.

//Stop the time wheel
func (r *RTimeWheel) Stop() {<!-- -->
    // Based on singleton tools, ensure that stopc can only be closed once
    r.Do(func() {<!-- -->
        // Close stopc to stop the resident goroutine from running
        close(r.stopc)
        // Terminate timer ticker
        r.ticker.Stop()
    })
}

Create task

When creating a scheduled task, each scheduled task needs to find its subordinate minute time slice based on its execution time.

The real storage logic of the scheduled task is defined in a Lua script and executed through the Eval method of the redis client.

//Add scheduled tasks
func (r *RTimeWheel) AddTask(ctx context.Context, key string, task *RTaskElement, executeAt time.Time) error {<!-- -->
    // Pre-verify the parameters of the scheduled task
    if err := r.addTaskPrecheck(task); err != nil {<!-- -->
        return err
    }


    task.Key = key
    // Serialize the scheduled task into a byte array
    taskBody, _ := json.Marshal(task)
    // By executing the lua script, the scheduled task is added to the redis zset. Essentially, the bottom layer uses the zadd instruction.
    _, err := r.redisClient.Eval(ctx, LuaAddTasks, 2, []interface{<!-- -->}{<!-- -->
        // Minute-level zset time slice
        r.getMinuteSlice(executeAt),
        //Identify the collection of task deletions
        r.getDeleteSetKey(executeAt),
        // Use the second-level timestamp of the execution time as the score in zset
        executeAt.Unix(),
        //Task details
        string(taskBody),
        //Task key, used to store in the delete collection
        key,
    })
    return err
}

//Usage example
if err := rTimeWheel.AddTask(ctx, "test1", & amp;RTaskElement{<!-- -->
CallbackURL: callbackURL,
Method: callbackMethod,
Req: callbackReq,
Header: callbackHeader,
}, time.Now().Add(time.Second)); err != nil {<!-- -->
t.Error(err)
return
}

// 1 When adding a task, if there is an identifier for deleting key, delete it
// When adding a task, determine which shard the data belongs to based on the time (min) {}
LuaAddTasks = `
   local zsetKey = KEYS[1]
   local deleteSetKey = KEYS[2]
   local score = ARGV[1]
   local task = ARGV[2]
   local taskKey = ARGV[3]
   redis.call('srem',deleteSetKey,taskKey)
   return redis.call('zadd',zsetKey,score,task)
`

Shown below are the details of obtaining the minute-level scheduled task ordered list minuteSlice and the deleted task set deleteSet.
Let’s first take a look at the addTaskPrecheck function, which verifies the task parameters.

func (r *RTimeWheel) addTaskPrecheck(task *RTaskElement) error {<!-- -->
if task.Method != http.MethodGet & amp; & amp; task.Method != http.MethodPost {<!-- -->
return fmt.Errorf("invalid method: %s", task.Method)
}
if !strings.HasPrefix(task.CallbackURL, "http://") & amp; & amp; !strings.HasPrefix(task.CallbackURL, "https://") {<!-- -->
return fmt.Errorf("invalid url: %s", task.CallbackURL)
}
return nil
}

Now take a look at getMinuteSlice, a method to obtain the key of the scheduled task ordered list:

func (r *RTimeWheel) getMinuteSlice(executeAt time.Time) string {<!-- -->
return fmt.Sprintf("xiaoxu_timewheel_task_{%s}", util.GetTimeMinuteStr(executeAt))
}

func GetTimeMinuteStr(t time.Time) string {<!-- -->
return t.Format(YYYY_MM_DD_HH_MM)
}

For example, the generated key isxiaoxu_timewheel_task_{2023-11-05-15:08};

Method to get the delete task collection key:

func (r *RTimeWheel) getDeleteSetKey(executeAt time.Time) string {<!-- -->
    return fmt.Sprintf("xiaoxu_timewheel_delset_{%s}", util.GetTimeMinuteStr(executeAt))
}

Now let’s look at the Lua script

type Client struct {<!-- -->
opts *ClientOptions
pool *redis.Pool
}
// Eval supports using lua scripts.
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{<!-- -->}) (interface{<!-- -->}, error) {<! -- -->
args := make([]interface{<!-- -->}, 2 + len(keysAndArgs))
args[0] = src
args[1] = keyCount
copy(args[2:], keysAndArgs)

conn, err := c.pool.GetContext(ctx)
if err != nil {<!-- -->
return -1, err
}
defer conn.Close()

return conn.Do("EVAL", args...)
}


// 1 When adding a task, if there is an identifier for deleting key, delete it
// When adding a task, determine which shard the data belongs to based on the time (min) {}
LuaAddTasks = `
   local zsetKey = KEYS[1]
   local deleteSetKey = KEYS[2]
   local score = ARGV[1]
   local task = ARGV[2]
   local taskKey = ARGV[3]
   redis.call('srem',deleteSetKey,taskKey)
   return redis.call('zadd',zsetKey,score,task)
`

This Go code defines a method called Eval, which enables the Go client to execute Lua scripts through a Redis connection. How the Eval method works, and how it works with the Lua script LuaAddTasks, we can analyze step by step as follows:

Eval method:

  • This method belongs to the Client type and accepts a context (context.Context), the source code of the Lua script (src string), the number of keys (keyCount integer), and a slice containing keys and parameters (keysAndArgs []interface{}) .
  • At the beginning of the method, first initialize a slice args large enough to store the source code of the Lua script, the number of keys, and all keys and parameters.
  • Then try to get a connection from the connection pool c.pool and handle any errors that may occur. If the connection cannot be obtained, an error is returned.
  • Use the defer statement to ensure that the connection will eventually be closed after you have finished using it.
  • Use the obtained connection to execute the EVAL command of Redis, passing in the args slice constructed previously.

What is the use of the EVAL command in Redis?

Redis’s EVAL command is used to execute Lua scripts. The execution of Lua scripts in Redis is atomic, meaning that while the script is running, the Redis server will not execute any other commands until the script is completed. This provides the user with the ability to execute multiple commands in a single execution step, either all or none of them, similar to a database transaction.
The basic usage of the EVAL command is:

EVAL script numkeys key [key ...] arg [arg ...]
  • script is the Lua script code to be executed.
  • numkeys is the number of keys, this parameter tells Redis which are key parameters and which are normal parameters so that it can handle data sharding and script caching correctly.
  • key [key …] are the key names passed to the script, specified by the numkeys parameter.
  • arg [arg …] are additional parameters passed to the script that will not be treated as keys by Redis.

Below is a detailed description of each part:

“local current = redis.call(‘get’, KEYS[1]) if current then current = redis.call(‘incr’, KEYS[1]) else current = redis.call(‘set’, KEYS[1] , 1) end return current” is a Lua script.

  • First, we use redis.call(get’, KEYS[1]) to get the current value of counter.
    • If counter exists (that is, current is not nil), we perform the increment operation redis.call(incr’, KEYS[1]).
    • If counter does not exist (i.e. current is nil), we use redis.call(set’, KEYS[1], 1) to set the value of counter to 1.
    • Finally, the script returns the new value of counter.
  • 1 is the numkeys parameter, indicating the number of key parameters to the Lua script.
  • counter is the key name, this is the key we want to increment.

LuaAddTasks script:

  • This Lua script is expected to receive two keys and three parameters.
  • KEYS[1] is the key of an ordered set (zsetKey), used to store tasks that need to be added.
  • KEYS[2] is the key of a set (deleteSetKey), which contains the key name of the task that needs to be deleted.
  • ARGV[1] is a score used to sort tasks in a sorted set.
  • ARGV[2] is the task content (task), which is the value to be added to the ordered set.
  • ARGV[3] is the key name of the task (taskKey), which is used to specify the task to be deleted in the deletion set.
  • The Lua script first calls redis.call(srem’, deleteSetKey, taskKey) to remove the specified taskKey from the deleteSetKey collection.
    The script then adds the task task and its score score to the ordered set of zsetKey via redis.call(zadd’, zsetKey, score, task) and returns the result of the operation.
  • When a client wants to add a new task, it can execute the LuaAddTasks script using the Eval method. If there is a key to be deleted when adding a task, the Lua script will first process the deletion operation and then add the new task to the corresponding ordered collection. This method is atomic, that is, deletion and addition operations either occur or neither occurs. This is a major advantage of using Lua scripts to operate Redis.

The following shows the execution logic of the Lua script in creating a scheduled task process:

Delete tasks

The way to delete a scheduled task is to append the scheduled task to the minute-level deleted task set. Later, when retrieving the scheduled tasks, the scheduled tasks will be filtered based on this set to implement a lazy deletion mechanism.

//Delete a scheduled task from the redis time wheel
func (r *RTimeWheel) RemoveTask(ctx context.Context, key string, executeAt time.Time) error {<!-- -->
    // Execute the lua script and append the deleted tasks to the set.
    _, err := r.redisClient.Eval(ctx, LuaDeleteTask, 1, []interface{<!-- -->}{<!-- -->
        r.getDeleteSetKey(executeAt),
        key,
    })
    return err
}

const(
    // Delete scheduled task lua script
    LuaDeleteTask = `
       -- Get the key of the set that identifies the deletion task
       local deleteSetKey = KEYS[1]
       -- Get the unique key of the scheduled task
       local taskKey = ARGV[1]
       --Add the unique key of the scheduled task to the set
       redis.call('sadd',deleteSetKey,taskKey)
       -- If it is the first element in the set, set an expiration time of 120 seconds for the set
       local scnt = redis.call('scard',deleteSetKey)
       if (tonumber(scnt) == 1)
       then
           redis.call('expire',deleteSetKey,120)
       end
       return scnt
) `

Execute scheduled tasks

When executing scheduled tasks, a list of scheduled tasks that meet the execution conditions will be obtained in batches through the getExecutableTasks method, and then the execute method will be called concurrently to complete the callback execution of the scheduled tasks.

// Execute scheduled tasks in batches
func (r *RTimeWheel) executeTasks() {<!-- -->
    defer func() {<!-- -->
        if err := recover(); err != nil {<!-- -->
            // log
        }
    }()
    // Concurrency control ensures that the entire batch of tasks is executed within 30 seconds, and goroutines are recycled in a timely manner to avoid goroutine leaks.
    tctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    //Scan redis zset according to the current time conditions to obtain all scheduled tasks that meet the execution conditions
    tasks, err := r.getExecutableTasks(tctx)
    if err != nil {<!-- -->
        // log
        return
    }
    // Execute tasks concurrently and aggregate them through waitGroup
    var wg sync.WaitGroup
    for _, task := range tasks {<!-- -->
        wg.Add(1)
        // shadow
        task := task
        go func() {<!-- -->
            defer func() {<!-- -->
                if err := recover(); err != nil {<!-- -->
                }
                wg.Done()
            }()
            //Execute scheduled tasks
            if err := r.executeTask(tctx, task); err != nil {<!-- -->
                // log
            }
        }()
    }
    wg.Wait()
}

This Lua script is for processing and managing collection (Set) type data in Redis, with some form of expiration time management. Specific steps are as follows:

  • local deleteSetKey = KEYS[1]: Assign the first key parameter in the Lua script to the variable deleteSetKey. Here KEYS is an array of parameters passed in from the EVAL command, representing the name of the key. In Redis’s Lua script, the KEYS array is used to pass key parameters.

  • local taskKey = ARGV[1]: Assign the first non-key parameter of the script to the variable taskKey. In the EVAL command, the ARGV array is used to pass additional parameters in addition to keys.

  • redis.call(sadd’, deleteSetKey, taskKey): Use the sadd command to add taskKey to the collection named deleteSetKey. If taskKey is already a member of the set, this command does nothing. It returns 1 if the new element was successfully added.

  • local scnt = redis.call(scard’, deleteSetKey): Get the number of members of the collection named deleteSetKey and assign this number to the variable scnt.

  • if (tonumber(scnt) == 1) then: Determine whether the number of elements in the deleteSetKey collection is 1. In Lua scripts, the tonumber function is used to ensure that scnt values are treated as numbers.

  • redis.call(expire’, deleteSetKey, 120): If the deleteSetKey collection has only one member (the taskKey just added), set the expiration time of the collection to 120 seconds. The expire command is used to set the time-to-live (TTL) of a key.

  • return scnt: The script returns the number of members of the deleteSetKey collection.

Retrieve scheduled tasks

Finally, let me introduce how to obtain a list of scheduled tasks that meet the execution conditions based on the current time:

  1. During each retrieval, the minute-level time slice to which it belongs is first calculated based on the current time.
  2. Then get the current second-level timestamp as the score range retrieved by the zrange command
  3. Call the lua script and obtain the deleted task set and the scheduled task list within the score range.
  4. Filter out deleted tasks through set, and then return scheduled tasks that meet the execution conditions
func (r *RTimeWheel) getExecutableTasks(ctx context.Context) ([]*RTaskElement, error) {<!-- -->
    now := time.Now()
    // Based on the current time, calculate its subordinate minute-level time slice
    minuteSlice := r.getMinuteSlice(now)
    // Calculate its corresponding minute-level deleted task set
    deleteSetKey := r.getDeleteSetKey(now)
    nowSecond := util.GetTimeSecond(now)
    // Use second-level timestamp as score for zset retrieval
    score1 := nowSecond.Unix()
    score2 := nowSecond.Add(time.Second).Unix()
    // Executing the lua script essentially uses the zrange instruction combined with the score corresponding to the second-level timestamp to perform scheduled task retrieval.
    rawReply, err := r.redisClient.Eval(ctx, LuaZrangeTasks, 2, []interface{<!-- -->}{<!-- -->
        minuteSlice, deleteSetKey, score1, score2,
    })
    if err != nil {<!-- -->
        return nil, err
    }
    // In the result, the first element corresponds to the key set of the deleted task, and subsequent elements correspond to each scheduled task.
    replies := gocast.ToInterfaceSlice(rawReply)
    if len(replies) == 0 {<!-- -->
        return nil, fmt.Errorf("invalid replies: %v", replies)
    }
    deleteds := gocast.ToStringSlice(replies[0])
    //Get the deleted element collection
    deletedSet := make(map[string]struct{<!-- -->}, len(deleteds))
    for _, deleted := range deleteds {<!-- -->
        deletedSet[deleted] = struct{<!-- -->}{<!-- -->}
    }
    // Traverse each scheduled task. If it exists in the deletion collection, skip it, otherwise append it to the list and return it for subsequent execution.
    tasks := make([]*RTaskElement, 0, len(replies)-1)
    for i := 1; i < len(replies); i + + {<!-- -->
        var taskRTaskElement
        if err := json.Unmarshal([]byte(gocast.ToString(replies[i])), & amp;task); err != nil {<!-- -->
            // log
            continue
        }


        if _, ok := deletedSet[task.Key]; ok {<!-- -->
            continue
        }
        tasks = append(tasks, & amp;task)
    }
    return tasks, nil
}

The execution logic of the lua script is as follows:

(
    //Scan the redis time wheel. Get the deleted task collection within the minute range and the scheduled tasks that meet the execution conditions in time and return them
    LuaZrangeTasks = `
       --The first key is the zset key that stores scheduled tasks
       local zsetKey = KEYS[1]
       --The second key is the key of the deleted task set
       local deleteSetKey = KEYS[2]
       --The first arg is the left boundary of score retrieved by zrange
       local score1 = ARGV[1]
       --The second arg is the right boundary of the score retrieved by zrange
       local score2 = ARGV[2]
       -- Get the collection of deleted tasks
       local deleteSet = redis.call('smembers',deleteSetKey)
       -- Perform zrange retrieval on zset based on the second-level timestamp to obtain scheduled tasks that meet the time conditions.
       local targets = redis.call('zrange',zsetKey,score1,score2,'byscore')
       --The retrieved scheduled tasks are directly removed from the time wheel to ensure that scheduled tasks are not repeatedly obtained in distributed scenarios.
       redis.call('zremrangebyscore',zsetKey,score1,score2)
       --The returned result is a table
       local reply = {}
       --The first element of the table is the deleted task collection
       reply[1] = deleteSet
       --Append the retrieved scheduled tasks to the table in turn
       for i, v in ipairs(targets) do
           reply[#reply + 1]=v
       end
       return reply
    `
)

Summary

In this issue, we discuss how to implement the time wheel algorithm from zero to one based on golang. Through the principle and source code, we demonstrate in detail the implementation of the stand-alone version and the redis distributed version of the time wheel.

Reference

https://zhuanlan.zhihu.com/p/658079556