redis implements distributed delay queue

Article directory

  • Introduction to delay queue
  • Application scenarios
  • Case:
  • consider:
  • accomplish:
    • the whole idea:
    • Implementation
      • producer
      • consumer
    • operation result
  • Advantages of redis distributed delay queue
  • Disadvantages of redis distributed delay queue

Introduction to delay queue

Delay queue is a special message queue that allows messages to be consumed after a certain delay time. The main feature of the delay queue is that it can delay the processing time of messages to meet the needs of scheduled tasks or scheduled events.

In short, the delay queue provides a convenient and reliable way to handle scheduled tasks and scheduled events by delaying the consumption time of messages. It plays an important role in distributed systems and can improve the reliability and performance of the system.

There are many ways to implement delay queues. This article introduces a distributed delay queue implemented by redis.

Application scenarios

  • Scheduled tasks: Tasks that need to be executed at a specific time can be encapsulated as delayed messages, and the execution of the tasks can be triggered through the delay queue.

  • Order timeout processing: You can send order messages to the delay queue and set the order timeout. After the time is exceeded, the consumer obtains the timeout order message from the queue and processes it accordingly.

  • Message retry mechanism: When a message fails to be processed, you can send the message to the delay queue and set a certain retry time. After the time expires, try processing again.

Case:

12306 Train ticket purchase. After grabbing the order, if the payment is not made within 45 minutes, the order will be automatically canceled.

Consider:

Data persistence: redis is supported, you can use rdb or aof

Ordered storage: Because as long as the smallest one has not expired, the following ones will definitely not have expired. In this case, just check the smallest node. Consider using the zset structure in redis

High availability: Consider sentry or cluster

High scalability: Because the number of 12306 users is very large, the task space stored in redis may be very large, so consider expanding the nodes. From this perspective, using the cluster cluster mode, the sentinel has only one node, which is the master. Node writes data.

Implementation:

Overall idea:

  • Producer-consumer model: Because the number of users of 12306 is very large, consider that there are multiple nodes for producers and consumers;
  • Adopt cluster mode to achieve high availability and high scalability;
  • Use zset to store delayed tasks (zadd key score member, score represents time);
  • In order to make data evenly distributed among multiple master nodes in the cluster: Build multiple zsets, each zset corresponds to a consumer, and the producer randomly sends data to a certain zset production data.

Detailed implementation

Producer

You need to install the hiredis-cluster cluster. The installation and compilation are as follows:

git clone https://github.com/Nordix/hiredis-cluster.git
cd hiredis-cluster
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -
DENABLE_SSL=ON ..
make
sudo make install
sudo ldconfig

You need to install the libevent library, and execute gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl to compile the producer executable program during final compilation.

#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>
#include <event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <sys/time.h>

int64_t g_taskid = 0;

#defineMAX_KEY 10

static int64_t hi_msec_now() {<!-- -->
    int64_t msec;
    struct timeval now;
    int status;
    status = gettimeofday(&now, NULL);
    if (status < 0) {<!-- -->
        return -1;
    }
    msec = (int64_t)now.tv_sec * 1000LL + (int64_t)(now.tv_usec / 1000LL);
    return msec;
}

static int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args) {<!-- -->
    int n;
    n = vsnprintf(buf, size, fmt, args);
    if (n <= 0) {<!-- -->
        return 0;
    }
    if (n <= (int)size) {<!-- -->
        return n;
    }
    return (int)(size-1);
}

static int _scnprintf(char *buf, size_t size, const char *fmt, ...) {<!-- -->
    va_list args;
    int n;
    va_start(args, fmt);
    n = _vscnprintf(buf, size, fmt, args);
    va_end(args);
    return n;
}

void connectCallback(const redisAsyncContext *ac, int status) {<!-- -->
    if (status != REDIS_OK) {<!-- -->
        printf("Error: %s\
", ac->errstr);
        return;
    }
    printf("Connected to %s:%d\
", ac->c.tcp.host, ac->c.tcp.port);
}

void disconnectCallback(const redisAsyncContext *ac, int status) {<!-- -->
    if (status != REDIS_OK) {<!-- -->
        printf("Error: %s\
", ac->errstr);
        return;
    }
    printf("Disconnected from %s:%d\
", ac->c.tcp.host, ac->c.tcp.port);
}

void addTaskCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {<!-- -->
    redisReply *reply = (redisReply *)r;
    if (reply == NULL) {<!-- -->
        if (cc->errstr) {<!-- -->
            printf("errstr: %s\
", cc->errstr);
        }
        return;
    }

    int64_t now = hi_msec_now() / 10;
    printf("add task success reply: %lld now=%ld\
", reply->integer, now);
}

int addTask(redisClusterAsyncContext *cc, char *desc) {<!-- -->
    /* Convert to centimeters seconds */
    int64_t now = hi_msec_now() / 10;
    g_taskid + + ;
    
    /* key */
    char key[256] = {<!-- -->0};
// In order to distribute data evenly among multiple master nodes in the cluster:
? // Construct multiple zsets, each zset corresponds to a consumer, and the producer randomly produces data into a zset.
// There can be many producers, you just need to ensure that the data is evenly produced in task_group:0-task_group:9
    int len = _scnprintf(key, 255, "task_group:%ld", g_taskid % MAX_KEY);
    key[len] = '\0';
    
    /* member */
    char mem[1024] = {<!-- -->0};
    len = _scnprintf(mem, 1023, "task:%ld:%s", g_taskid, desc);
    mem[len] = '\0';
    
    int status;
// Delay each task for 5 seconds to process
    status = redisClusterAsyncCommand(cc, addTaskCallback, "",
                                      "zadd %s %ld %s", key, now + 500, mem);

    printf("redisClusterAsyncCommand:zadd %s %ld %s\
", key, now + 500, mem);
    if (status != REDIS_OK) {<!-- -->
        printf("error: err=%d errstr=%s\
", cc->err, cc->errstr);
    }
    return 0;
}

void stdio_callback(struct bufferevent *bev, void *arg) {<!-- -->
    redisClusterAsyncContext *cc = (redisClusterAsyncContext *)arg;
    struct evbuffer *evbuf = bufferevent_get_input(bev);
    char *msg = evbuffer_readln(evbuf, NULL, EVBUFFER_EOL_LF);
    if (!msg) return;

    if (strcmp(msg, "quit") == 0) {<!-- -->
        printf("safe exit!!!\
");
        exit(0);
        return;
    }
    if (strlen(msg) > 1024-5-13-1) {<!-- -->
        printf("[err]msg is too long, try again...\
");
        return;
    }

    addTask(cc, msg);
    printf("stdio read the data: %s\
", msg);
}

int main(int argc, char **argv) {<!-- -->
    printf("Connecting...\
");
// Connect to the cluster cluster. You can connect to the cluster from any node in the cluster cluster.
    redisClusterAsyncContext *cc =
        redisClusterAsyncConnect("127.0.0.1:7006", HIRCLUSTER_FLAG_NULL);
    printf("redisClusterAsyncContext...\
");
    if (cc & amp; & amp; cc->err) {<!-- -->
        printf("Error: %s\
", cc->errstr);
        return 1;
    }

    struct event_base *base = event_base_new();
    redisClusterLibeventAttach(cc, base);
    redisClusterAsyncSetConnectCallback(cc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(cc, disconnectCallback);

    // nodeIterator ni;
    // initNodeIterator( & amp;ni, cc->cc);
    // cluster_node *node;
    // while ((node = nodeNext( & amp;ni)) != NULL) {<!-- -->
    // printf("node %s:%d role:%d pad:%d\
", node->host, node->port, node->role, node->pad);
    // }
    struct bufferevent *ioev = bufferevent_socket_new(base, 0, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(ioev, stdio_callback, NULL, NULL, cc);
    bufferevent_enable(ioev, EV_READ | EV_PERSIST);

    printf("Dispatch..\
");
    event_base_dispatch(base);

    printf("Done..\
");
    redisClusterAsyncFree(cc);
    event_base_free(base);
    return 0;
}

// Need to install hiredis-cluster libevent
// gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl

Description:

Here, 10 zsets are constructed, namely task_group:0, task_group:1,…, task_group:9 as the key of 10 zsets. The data of zset actually represents the number of consumers. Usually the functions of consumers are exactly the same. Yes, the producer doesn’t care how many you have, you just need to scatter the tasks evenly in different zsets (the specific implementation can create a global id, id + + each time you add a task, and then set the zset to each Modulo the number 10, you can finally get a number between 0-9, and then splice it with task_group, so that the tasks can be evenly dispersed in different zsets).

Consumer

Consumers are implemented using skynet + lua scripts. Each consumer will constantly check whether the tasks in redis have expired. If they have expired, they will be taken out and deleted (this is just a demo, just delete the task after printing)

local skynet = require "skynet"

local function table_dump( object )
    if type(object) == 'table' then
        local s = '{ '
        for k,v in pairs(object) do
            if type(k) ~= 'number' then k = string.format("%q", k) end
            s = s .. '['..k..'] = ' .. table_dump(v) .. ','
        end
        return s .. '} '
    elseif type(object) == 'function' then
        return tostring(object)
    elseif type(object) == 'string' then
        return string.format("%q", object)
    else
        return tostring(object)
    end
end

local mode, key = ...
if mode == "slave" then
    local rediscluster = require "skynet.db.redis.cluster"
    local function onmessage(data,channel,pchannel)
        print("onmessage",data,channel,pchannel)
    end
    skynet.start(function ()
        local db = rediscluster.new({<!-- -->
                {<!-- -->host="127.0.0.1",port=7001},
            },
            {<!-- -->read_slave=true,auth=nil,db=0,},
            onmessage
        )
        assert(db, "redis-cluster startup error")
        skynet.fork(function ()
            while true do
                local res = db:zrange(key, 0, 0, "withscores")
                if not next(res) then
                    skynet.sleep(50)
                else
                    local expire = tonumber(res[2])
                    local now = skynet.time()*100
                    if now >= expire then
                        print(("%s is comsumed:expire_time:%d"):format(res[1], expire))
                        db:zrem(key, res[1])
                    else
                        skynet.sleep(10)
                    end
                end
            end
        end)
    end)

else
    skynet.start(function () -- // Start 10 programs, and pass "slave" into mode and task_group:i into key, that is, each program only consumes one
        for i=0,9 do
            skynet.newservice(SERVICE_NAME, "slave", "task_group:"..i)

Run results

Advantages of redis distributed delay queue

1.Redis zset supports high-performance score sorting.

2.Redis operates on memory and is very fast.

3. Redis can build a cluster. When there are a lot of messages, we can use the cluster to increase the speed of message processing and improve availability.

4.Redis has a persistence mechanism. When a failure occurs, the data can be restored through AOF and RDB, ensuring the reliability of the data.

Disadvantages of redis distributed delay queue

The delayed message queue implemented using Redis also has problems with data persistence and message reliability:

  • There is no retry mechanism – there is no retry mechanism when exceptions occur when processing messages. These need to be implemented by yourself, including the implementation of the number of retries;
  • There is no ACK mechanism – for example, when a message is obtained and deleted, and the client crashes while the message is being processed, the message being processed will be lost. MQ needs to explicitly return a value to MQ before it will think that The message is consumed correctly.

Summary: If you have high requirements for message reliability, it is recommended to use MQ to implement it.