2023Redis-stream is used in detail with spring’s data-redis (including broadcast and group reception)

Directory

  • 1. Introduction
    • 1 Introduction
    • 2. Comparison
  • 2. Integrate spring’s data-redis implementation
    • 1. Use dependencies
    • 2. Configuration class
      • 2.1. Configure `RedisTemplate bean`
      • 2.2. Exception class
    • 3. Entity class
      • 3.1、User
      • 3.2. Book
    • 4. Send message
      • 4.1. RedisStreamUtil tool class
      • 4.2. Simulate sending messages through the delay queue thread pool
      • 4.3. Actively send messages through http
    • 5. Message reception
      • 5.1. Not bound to consumer groups—can achieve broadcast effect
        • Method 1: Active reading
          • Test log
        • Method 2: Use the listener to monitor whether there are new messages
      • 5.2. Designate consumer groups—-enable only one member in a group to receive
        • 5.2.1. Configuration class
        • 5.2.2. Listener
          • Send messages through delay queue—Test results:
          • Send User to subclass Book object data test results through http
  • 3. Complete code
  • 4. Quote

Background:
This method is used to implement, mainly because there is a place in the project that just needs asynchronous implementation, and the project does not configure professional message middleware, and it is not used too frequently, so I feel that there is no need to install an MQ service directly. Direct and specific business logic for asynchronous message reception is implemented through the existing redis stream.

1. Introduction

1. Introduction

Redis Stream (Redis Streams) is a data structure introduced in Redis 5.0 version, used to process time series data, message queues and log streams. It provides a high-throughput, durable, ordered, and scalable messaging solution. The Redis Stream structure is an enhancement to the traditional publish/subscribe model, allowing you to process data streams more flexibly and providing the following main features:

  1. Multiple producers and multiple consumers: Multiple producers can write messages to the Stream at the same time, and multiple consumers can independently subscribe and consume messages. Each consumer can have a different consumption rate.

  2. Consumer group: Redis Stream introduces the concept of consumer group. Multiple consumers can join the same consumer group and consume messages together. This ensures that messages will not be processed multiple times during consumption.

  3. Consumer blocking: Consumers can use the XREADGROUP command to obtain messages in a blocking manner. New messages will only be pushed to the consumer when they arrive.

  4. Consumer automatic confirmation: Redis Stream supports automatic confirmation of messages. Consumers can tell Redis when to confirm that a message has been successfully processed.

  5. Multiple Stream support: You can create multiple Streams to store different kinds of data and process them separately.

  6. Orderliness: Messages are stored in the Stream in order according to the timestamp of the message, so you can read the data in the order of the message.

  7. Persistent storage: Redis Stream uses in-memory data structures but also supports asynchronous saving of data to disk to ensure data is not lost.

2. Comparison

Compared with other implementations of redis, it has more comprehensive functions. It supports persistence and ack confirmation, which basically solves the problem of message loss. Of course, it is still somewhat insufficient compared to professional message queue middleware.
If you need to see a detailed comparison, you can read this article Redis Queue Comparison

2. Integrate spring’s data-redis implementation

1. Use dependencies

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

2. Configuration class

2.1. Configure RedisTemplate bean

The key point is the following sentence. You cannot use the json serialization class, otherwise the serialization will fail.
redisTemplate.setHashValueSerializer(RedisSerializer.string());

 @Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {<!-- -->
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// JSON serialization cannot be used in this place. If ObjectRecord is used to transmit objects, there may be problems and a java.lang.IllegalArgumentException: Value must not be null! error will appear.
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}

2.2, Exception class

@Slf4j
public class CustomErrorHandler implements ErrorHandler {<!-- -->
    @Override
    public void handleError(Throwable throwable) {<!-- -->
        log.error("An exception occurred",throwable);
    }
}

3. Entity class

Two entity classes are used here, mainly for testing. If the specified type is not the same type, but the type of the parent class is specified, whether the message can be deserialized and received normally.

3.1、User

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Book extends User{<!-- -->
    private String title;
    private String author;
}

3.2, Book

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {<!-- -->
    private String name;
    private Integer age;
}

4. Send message

Sending messages is mainly sent to redis through redisTemplate.opsForStream().add(record); (it will be received in two ways, see the follow-up!)

4.1, RedisStreamUtil tool class

Used to implement methods such as message sending, initialization group, key binding group, clearing consumed messages, etc.

  • When sending a message for the first time, you need to bind the receiving group and key first, otherwise an exception that the group does not exist will be reported when receiving
  • After sending a message, the consumed message needs to be cleared, otherwise it will remain in the stream.
@Component
@Slf4j
public class RedisStreamUtil {<!-- -->
    public static final String STREAM_KEY_001 = "stream-001";
    @Resource
    private RedisTemplate<String,Object> redisTemplate;


    /**
     * Add records to the stream
     * @param streamKey
     * @param t
     * @param <T>
     */
    public <T> RecordId add(String streamKey,T t){<!-- -->
        ObjectRecord<String, T> record = StreamRecords.newRecord()
                .in(streamKey) //key
                .ofObject(t) //Message data
                .withId(RecordId.autoGenerate());
//        Send a message
        RecordId recordId = redisTemplate.opsForStream().add(record);

        log.info("Added successfully, returned record-id[{}]",recordId);


        return recordId;
    }

    /**
     * Used to create binding streams and groups
     */
    public void addGroup(String key, String groupName){<!-- -->
        redisTemplate.opsForStream().createGroup(key,groupName);
    }
    /**
     * Used to determine whether the key exists
     */
    public boolean hasKey(String key){<!-- -->
        if(key==null){<!-- -->
            return false;
        }else{<!-- -->
            return redisTemplate.hasKey(key);
        }

    }
    /**
     * Used to delete consumed messages
     */
    public void delField(String key,RecordId recordIds){<!-- -->
        redisTemplate.opsForStream().delete(key,recordIds);
    }

    /**
     * Used to initialize and implement binding keys and consumer groups
     */
    public void initStream(String key, String group){<!-- -->
        //Determine whether the key exists, if not, create it
        boolean hasKey = hasKey(key);
        if(!hasKey){<!-- -->
            Map<String,Object> map = new HashMap<>();
            map.put("key","value");
            RecordId recordId = add(key, map);
            addGroup(key,group); //You need to bind Stream and group during first initialization.
            delField(key,recordId); //Clear the useless data
            log.info("stream:{}-group:{} initialize success",key,group);
        }
    }


    public String getStreamKey001(){<!-- -->
        return STREAM_KEY_001;
    }
}

4.2. Simulate sending messages through the delay queue thread pool

  • In this method, after a delay of 5 seconds through simulation, a piece of data is sent every 3 seconds, and the thread pool is closed after sending 10 pieces.
/**
 * Executed during spring initialization, sending messages to the stream regularly, used to simulate sending messages
 */
//@Component
public class StreamMessageRunner implements ApplicationRunner {<!-- -->

    @Resource
    private RedisStreamUtil redisStreamUtil;


    @Override
    public void run(ApplicationArguments args) throws Exception {<!-- -->
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        AtomicInteger integer = new AtomicInteger(0);
        //Use the delay queue thread pool to simulate sending data messages
        pool.scheduleAtFixedRate(()->{<!-- -->
            User zhangsan = new User("zhangsan" + integer.get(), 1 + integer.get());
            RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), zhangsan);
            integer.getAndIncrement();
//Consumed messages need to be cleared, otherwise they will remain in the stream and be consumed repeatedly.
            redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
            if (integer.get()>10){<!-- -->
                System.out.println("---------Exit sending message--------");
                pool.shutdown();
            }
        },5,3, TimeUnit.SECONDS);
    }
}

4.3. Actively send messages through http

  • See different effects by sending parent and child categories separately for comparison.
@RestController
@RequestMapping("/index")
public class index {<!-- -->
    @Resource
    private RedisStreamUtil redisStreamUtil;

   /**
    * father
    */
    @GetMapping("/login")
    public String login(User user){<!-- -->

        RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), user);
        redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);

        return "Success!";
    }
    /**
    * Subclass
    */
    @GetMapping("/login2")
    public String login(Book book){<!-- -->

        RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), book);

        redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
        return "Success!";
    }
}

5. Message reception

5.1. Not bound to consumer groups-broadcast effects can be achieved

The node consumer does not bind the consumer group, but binds it directly to the stream to achieve the broadcast effect. Every time a message is sent to the stream of the specified node, it can be received.

As shown below: A message is sent to redis Stream, and all A0 to B2 bound in it can receive this message

Method 1: Active reading

Actively read messages from the stream through the redisTemplate.opsForStream().read() method

/**
 * Independent consumer---can read all messages of the key
 */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {<!-- -->
    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String,Object> redisTemplate;
    private volatile boolean stop = false;


    /**
     * Executed when initializing the bean, and actively read messages from the specified key of the stream in a polling manner.
     * @throwsException
     */
    @Override
    public void afterPropertiesSet() throws Exception {<!-- -->
        //Initialize thread pool
        threadPoolExecutor = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {<!-- -->
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });

        StreamReadOptions options = StreamReadOptions.empty()
// If there is no data, block for 1s, and the blocking time needs to be less than ·spring.redis.timeout·
                .block(Duration.ofMillis(1000))
// Block until data is obtained, a timeout exception may be reported
// .block(Duration.ofMillis(0))
// Get 10 pieces of data at a time
                .count(10);

        StringBuilder readBuilder = new StringBuilder("0-0");

        threadPoolExecutor.execute(()->{<!-- -->
            while (!stop){<!-- -->
                //Actively read from the redis stream. The options are set to block for one second each time it is read.
                List<ObjectRecord<String, User>> objectRecords = redisTemplate.opsForStream()
                        .read(User.class, options,
                                StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.from(readBuilder.toString())));

                if (CollectionUtils.isEmpty(objectRecords)){<!-- -->
                    log.warn("No data read");
                    continue;
                }
                objectRecords.stream().forEach(objectRecord->{<!-- -->
                    log.info("obtained data information id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readBuilder.setLength(0);
                    readBuilder.append(objectRecord.getId());

                });
            }
        });


    }

    /**
     * Close the thread pool when destroying the bean
     * @throwsException
     */
    @Override
    public void destroy() throws Exception {<!-- -->
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3,TimeUnit.SECONDS);
    }

}
Test log

Method 2: Use the listener to monitor whether there are new messages

The specific code is the same as the grouping, except that the group is not specified, so it is merged and written below.
Mainly implemented through the listener class StreamMessageListenerContainer.

Mainly through the following sentence:

container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("Independent consumption", null, null));

5.2. Designate consumer groups—-enable only one member in a group to receive messages

After grouping, only one member of a group can read the message, as shown in the figure below. Of course, multiple groups can also be bound during use, and each group receives messages that it does not listen to. The following method is equivalent to mq, the relationship between switches, queues and routing keys

5.2.1, Configuration class

The specific process of the following code first creates a thread pool; then configures the message listening container, and finally puts the listener for receiving messages into the listening container, and finally injects the listening container into the bean.

@Configuration
public class RedisStreamConfiguration {<!-- -->

    @Resource
    private RedisStreamUtil redisStreamUtil;
    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    @Bean(initMethod = "start",destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String,User>> streamMessageListenerContainer(){<!-- -->
        AtomicInteger index = new AtomicInteger(1);
// Get the number of native threads
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(50)
                , (r) -> {<!-- -->
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }, new ThreadPoolExecutor.CallerRunsPolicy());

// The message listening container cannot be implemented externally. Once created, the StreamMessageListenerContainer can subscribe to the Redis stream and consume incoming messages
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,ObjectRecord<String,User>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        //How many messages can be obtained at most at one time?
                        .batchSize(10)
                        // Run the poll task of Stream
                        .executor(executor)
                        // When there is no message in the Stream, the blocking time needs to be shorter than `spring.redis.timeout`
                        .pollTimeout(Duration.ofSeconds(1))
                        // When ObjectRecord is used, convert the object's filed and value into a Map. For example: convert the Book object into a map.
// .objectMapper(new ObjectHashMapper())
                        // During the process of obtaining the message or the process of obtaining the message for processing by the specific message sender, an exception occurred.
                        .errorHandler(new CustomErrorHandler())
                        //Convert the Record sent to the Stream into an ObjectRecord. The specific type converted into is the type specified here.
                        .targetType(User.class)
                        .build();


        StreamMessageListenerContainer<String, ObjectRecord<String, User>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options);


        //Initialization-bind key and consumer group
        redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a");

// Do not bind consumer groups, consume independently
        container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("Independent consumption", null, null));

        //Consumer group A, no automatic ack
        //Start consuming messages that are not assigned to consumers in the consumer group
// container.receive(Consumer.from("group-a","consumer-a"),
// StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()), new MonitorStreamListener("Consumer Group A","group-a", "consumer-a"));

// automatic ack
        container.receiveAutoAck(Consumer.from("group-a","consumer-b"),
                StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("Consumer Group B","group-a", "consumer-b"));

        return container;
    }
}

Important code analysis:

  1. .targetType(User.class): Used to specify the type when configuring the listening container. If not specified, the default is string type. If you pass in a string other than the string machine, you need to specify it; if you configure a parent Classes can also receive messages from subclasses and convert them. But if it is the configured Object type, it will be a path when received, and the incoming object cannot be obtained normally (I don’t know why, someone with research and understanding can answer it)
  2. redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a"): When generating for the first time, the consumer group needs to be bound to the key of the stream, otherwise an error will be reported. The specific internal execution logic You can see the initStream() method (or manually bind it to redis through the command: xgroup create stream-001 group-a $) stream-001 (key) group-a (consumer group)
  3. container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("Independent consumption", null, null)): This sentence does not bind the consumer group, which is the way of broadcasting Monitor all messages in the key (The difference from the above is that this method passively monitors messages)
  4. container.receiveAutoAck(Consumer.from("group-a","consumer-b") ,StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener(" Consumer group B","group-a", "consumer-b")):It is through this code that group monitoring messages are implemented, and the consumer group and consumption are bound The name of the listener, and the listener class. Then use the automatic ack method to reply to the stream to confirm that the message has been received (or return the message to the stream through manual ack method, otherwise it will be sent repeatedly),
5.2.2, Listener

Used to receive messages and then implement specific business codes

@Slf4j
public class MonitorStreamListener <T> implements StreamListener<String, ObjectRecord<String,T>> {<!-- -->

    /**
     *Consumer type: independent consumption, consumption group consumption
     */
    private String consumerType;
    /**
     *Consumer group
     */
    private String group;
    /**
     * A consumer in the consumer group
     */
    private String consumerName;

    public MonitorStreamListener(String consumerType, String group, String consumerName) {<!-- -->
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }
    
    @Override
    public void onMessage(ObjectRecord<String, T> message) {<!-- -->
        log.info("Received message from redis");

        String stream = message.getStream();
        RecordId id = message.getId();
        User value = (User) message.getValue();
        value.getName();

//Execute specific business logic for receiving messages
        if (StringUtils.isEmpty(group)) {<!-- -->
            log.info("[{}]: Received a message stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);

        } else {<!-- -->
            log.info("[{}] group:[{}] consumerName:[{}] received a message stream:[{}],id:[{}],value:[{}]", consumerType ,
                    group, consumerName, stream, id, value);
        }
        
        // When consuming in a consumer group, if it is not an automatic ack, you need to manually ack here.
// redisTemplate.opsForStream()
// .acknowledge("key","group","recordId");
        
    }
}
Send messages through delay queue-test results:

Send User to subclass Book object data test results via http


Result: It can also be accepted normally

3. Complete code

Full code warehouse address

4. Quote

https://juejin.cn/post/7029302992364896270#heading-0
https://juejin.cn/post/6844904125822435341? searchId=202310141054532F9807A1000F6680C0DF#heading-1