Java source code analysis Lecture 21: How to implement the message queue in Redis? How many ways are there?

If you are careful, you may have noticed that there are three classes in this series of courses that are all about message queues. In class 10, we talked about program-level message queues and the implementation of delayed message queues, and in class 15, we talked about common messages. Queue middleware RabbitMQ, Kafka, etc., which shows the importance of message queues in the entire Java technology system. In this lesson, we will focus on how Redis implements message queues.

Our interview question for this class is, how many ways are there to implement message queues in Redis?

Typical Answer

There are two ways to use Redis to implement message queues before Redis 2.0:

  • Implemented using the List type
  • Implemented using the ZSet type

Among them, the implementation method of **List type is the simplest and most direct**. It mainly realizes the message queue by storing and reading lpush and rpop, as shown in the following figure:

image.png

lpush can store the latest message to the head of the message queue (List collection), and rpop can read the tail of the message queue, thus realizing first-in-first-out, as shown in the following figure:

image (1).png

The implementation command of the command line is as follows:

127.0.0.1:6379> lpush mq "java" #Push message java
(integer) 1
127.0.0.1:6379> lpush mq "msg" #Push message msg
(integer) 2
127.0.0.1:6379> rpop mq #received message java
"java"
127.0.0.1:6379> rpop mq #received message msg
"mq"

Among them, mq is equivalent to the name of the message queue, lpush is used to produce and add messages, and rpop is used to pull and consume messages.
The advantage of using List to implement message queue is that messages can be persisted. List can use Redis’s own persistence function, AOF or RDB or hybrid persistence to save data to disk, so that when Redis restarts, messages will not be lost.

However, there are also certain problems in using List, such as messages do not support repeated consumption, there is no function of subscribing according to topics, and consumption message confirmation is not supported.

The way ZSet implements the message queue is similar to that of List. It uses zadd and zrangebyscore to store and read messages, so I won’t repeat the description here. However, the implementation of ZSet is more complicated, because ZSet has an additional score attribute, which we can use to achieve more functions, such as using it to store timestamps to implement delayed message queues, etc.

ZSet also has the function of persistence, and the problems of List also exist. Not only that, but ZSet cannot store the values of the same elements. Because it is an ordered set, the stored element values of the ordered set cannot be repeated, but the score can be repeated, that is to say, when the message value is repeated, only one piece of information can be stored in ZSet.

After Redis 2.0, Redis added special publishing and subscription types, Publisher (publisher) and Subscriber (subscriber) to implement message queues, and their corresponding execution commands are as follows:

  • Publish message, publish channel “message”
  • subscribe news, subscribe channel

Using the type of publish and subscribe, we can implement the function of topic subscription, which is the function of Pattern Subscribe. Therefore, we can use a consumer “queue_*” to subscribe to all message queues starting with “queue_”, as shown in the following figure:

image (2).png

The advantages of the publish-subscribe model are obvious, but there are also the following three problems:

  • Messages cannot be persisted. If the Redis server goes down or restarts, all messages will be lost;
  • The publish-subscribe mode is a “post and forget” working mode. If a subscriber reconnects offline, they cannot consume previous historical messages;
  • The consumer confirmation mechanism is not supported, and the stability cannot be guaranteed. For example, after the consumer obtains the message, it crashes before executing it. Because there is no consumer confirmation mechanism, Redis will mistakenly think that the consumer has already executed it, so it will not repeatedly send messages that have not been consumed normally, so the overall Redis stability cannot be guaranteed.

However, after Redis 5.0, the Stream type is added, we can use Stream’s xadd and xrange to store and read messages, and Stream provides the xack command to manually confirm message consumption, and we can use it to realize consumption The function confirmed by the author, use the command as follows:

127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1

The relevant syntax is as follows:

xack key group-key ID [ID ...]

Consumption confirmation increases the reliability of the message. Generally, after the business processing is completed, it is necessary to execute ack to confirm that the message has been consumed. The execution of the entire process is shown in the following figure:

image (3).png

Among them, “Group” is a group, and the consumer, that is, the receiver, needs to subscribe to the group to get the message normally.

The above are the four ways that Redis implements message queues. They are:

  • Use List to implement message queue;
  • Use ZSet to implement message queue;
  • Implement message queues using the publish-subscriber pattern;
  • Implement a message queue using Stream.

Analysis of test sites

The topic of this class comprehensively examines the interviewer’s understanding and comprehension of the overall knowledge framework of Redis and the features of the new version. The more commonly used ways to implement message queues in early versions are List, ZSet, and publish-subscriber mode. Using Stream to implement message queues is a solution that has only become popular in the past two years, and many companies have not used such a new version as Redis 5.0. . Therefore, you only need to answer the first three to pass the test, and the implementation of the message queue in the Stream method is an additional question. It would be even better if you can answer it in the interview. It reflects your sensitivity to new technologies and your love for technology. It is a bonus item in the interview.

The interview questions related to this knowledge point are as follows:

  • What’s wrong with using List to implement message queue in Java code? How should it be solved?
  • How to use Stream to implement message queue in the program?

Knowledge expansion

Use List to implement message queue

In the Java program, we need to use the Redis client framework to assist the program to operate Redis, such as the Jedis framework.

To use the Jedis framework, you first need to add Jedis dependencies to the pom.xml file, and the configuration is as follows:

<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>${version}</version>
</dependency>

The complete code of List implementing message queue is as follows:

import redis.clients.jedis.Jedis;
publicclass ListMQTest {
    public static void main(String[] args){
        // Start a thread as a consumer
        new Thread(() -> consumer()).start();
        // producer
        producer();
    }
    /**
     * producer
     */
    public static void producer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // forward news
        jedis.lpush("mq", "Hello, List.");
    }
    /**
     * consumer
     */
    public static void consumer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // consume message
        while (true) {
            // get message
            String msg = jedis.rpop("mq");
            if (msg != null) {
                // message received
                System.out.println("Received message:" + msg);
            }
        }
    }
}

The result of running the above program is:

Received message: Hello, Java.

However, there is a problem with the above code. It can be seen that the implementation of the above consumer is to obtain messages through a while infinite loop, but if the idle time of the message is relatively long, there is no new task, and the while loop will not stop because of this. It will always perform circular actions, which will waste system resources in vain.

At this time, we can use the blocking read in Redis to replace rpop to solve this problem. The specific implementation code is as follows:

import redis.clients.jedis.Jedis;
public class ListMQExample {
    public static void main(String[] args) throws InterruptedException {
        // consumer
        new Thread(() -> bConsumer()).start();
        // producer
        producer();
    }
    /**
     * producer
     */
    public static void producer() throws InterruptedException {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // forward news
        jedis.lpush("mq", "Hello, Java.");
        Thread. sleep(1000);
        jedis.lpush("mq", "message 2.");
        Thread. sleep(2000);
        jedis.lpush("mq", "message 3.");
    }
    /**
     * Consumer (blocking version)
     */
    public static void bConsumer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        while (true) {
            // blocking read
            for (String item : jedis.brpop(0,"mq")) {
                // Read relevant data and perform business processing
                System.out.println(item);
            }
        }
    }
}

The result of running the above program is:

Received message: Hello, Java.

The above code has been improved. We use brpop instead of rpop to read the last message, which can solve the situation that the while loop consumes system resources continuously in the absence of data. The b in brpop means blocking, which means blocking reading, that is, when there is no data in the queue, it will enter a dormant state, and when data enters the queue, it will “wake up” to perform the reading task, which can solve the problem The while loop has been executing the problem of consuming system resources.

Use Stream to implement message queue

Before starting to implement the message queue, we must first create a group, because consumers need to associate group information to run normally. The specific implementation code is as follows:

import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils. JedisUtils;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StreamGroupExample {
    private static final String _STREAM_KEY = "mq"; // stream key
    private static final String _GROUP_NAME = "g1"; // group name
    private static final String _CONSUMER_NAME = "c1"; // name of consumer 1
    private static final String _CONSUMER2_NAME = "c2"; // name of consumer 2
    public static void main(String[] args) {
        // producer
        producer();
        // Create a consumer group
        createGroup(_STREAM_KEY, _GROUP_NAME);
        // consumer 1
        new Thread(() -> consumer()).start();
        // consumer 2
        new Thread(() -> consumer2()).start();
    }
    /**
     * Create consumer groups
     * @param stream stream key
     * @param groupName group name
     */
    public static void createGroup(String stream, String groupName) {
        Jedis jedis = JedisUtils. getJedis();
        jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
    }
    /**
     * producer
     */
    public static void producer() {
        Jedis jedis = JedisUtils. getJedis();
        // add message 1
        Map<String, String> map = new HashMap<>();
        map.put("data", "redis");
        StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
        System.out.println("Message added successfully ID:" + id);
        // add message 2
        Map<String, String> map2 = new HashMap<>();
        map2.put("data", "java");
        StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
        System.out.println("Message added successfully ID:" + id2);
    }
    /**
     * consumer 1
     */
    public static void consumer() {
        Jedis jedis = JedisUtils. getJedis();
        // consume message
        while (true) {
            // read message
            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            // Block reading a message (maximum blocking time 120s)
            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
                    120 * 1000, true, entry);
            if (list != null & amp; & amp; list. size() == 1) {
                // read the message
                Map<String, String> content = list.get(0).getValue().get(0).getFields(); // message content
                System.out.println("Consumer 1 read message ID:" + list.get(0).getValue().get(0).getID() +
                        " Content: " + new Gson().toJson(content));
            }
        }
    }
    /**
     * consumer 2
     */
    public static void consumer2() {
        Jedis jedis = JedisUtils. getJedis();
        // consume message
        while (true) {
            // read message
            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            // Block reading a message (maximum blocking time 120s)
            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
                    120 * 1000, true, entry);
            if (list != null & amp; & amp; list. size() == 1) {
                // read the message
                Map<String, String> content = list.get(0).getValue().get(0).getFields(); // message content
                System.out.println("Consumer 2 read message ID:" + list.get(0).getValue().get(0).getID() +
                        " Content: " + new Gson().toJson(content));
            }
        }
    }
}

The result of running the above code is as follows:

The message was successfully added ID: 1580971482344-0
Message added successfully ID: 1580971482415-0
Consumer 1 reads message ID: 1580971482344-0 content: {<!-- -->"data":"redis"}
Consumer 2 reads message ID: 1580971482415-0 content: {<!-- -->"data":"java"}

Among them, the fifth parameter noAck of the jedis.xreadGroup() method indicates whether to automatically confirm the message. If it is set to true, the message will be automatically confirmed (ack) when the message is received, otherwise it needs to be confirmed manually.

It can be seen that multiple consumers in the same group will read different messages, and different consumers will not read the same message in the group.

Tips: The latest version of the Jedis framework should be used. When the lower version block setting is greater than 0, a bug will occur and a connection timeout exception will be thrown.

Summary

In this class, we talked about four implementations of message queues in Redis: List mode, ZSet mode, publish-subscriber mode, and Stream mode. Among them, the publish-subscriber mode does not support message persistence, while the other three methods support persistence, and The Stream method supports consumer confirmation. We also use the Jedis framework to complete the message queue function of List and Stream. It should be noted that brpop is needed to read messages in List instead of rpop, which can solve the problem of wasting system resources when there is no task while looping.

Featured Comments

** Chi:

How does Stream manually confirm messages

Instructor Response:

Streams have an xack key group-key ID that can be used to acknowledge messages.

**Taiwan:

Stream, when the consumer is offline, can the consumer remember where the consumption is, or is it recorded by the server. Is his principle to copy to multiple copies of data, or does the consumer remember the consumption progress?

Instructor Response:

Such functions can only be stored on the server side, and cannot be stored on the client side. It has a consumption confirmation number, which records unconsumed information.