Directory
- 1. Introduction
-
- 1 Introduction
- 2. Comparison
- 2. Integrate spring’s data-redis implementation
-
- 1. Use dependencies
- 2. Configuration class
-
- 2.1. Configure `RedisTemplate bean`
- 2.2. Exception class
- 3. Entity class
-
- 3.1、User
- 3.2. Book
- 4. Send message
-
- 4.1. RedisStreamUtil tool class
- 4.2. Simulate sending messages through the delay queue thread pool
- 4.3. Actively send messages through http
- 5. Message reception
-
- 5.1. Not bound to consumer groups—can achieve broadcast effect
-
- Method 1: Active reading
-
- Test log
- Method 2: Use the listener to monitor whether there are new messages
- 5.2. Designate consumer groups—-enable only one member in a group to receive
-
- 5.2.1. Configuration class
- 5.2.2. Listener
-
- Send messages through delay queue—Test results:
- Send User to subclass Book object data test results through http
- 3. Complete code
- 4. Quote
Background:
This method is used to implement, mainly because there is a place in the project that just needs asynchronous implementation, and the project does not configure professional message middleware, and it is not used too frequently, so I feel that there is no need to install an MQ service directly. Direct and specific business logic for asynchronous message reception is implemented through the existing redis stream.
1. Introduction
1. Introduction
Redis Stream (Redis Streams) is a data structure introduced in Redis 5.0 version, used to process time series data, message queues and log streams. It provides a high-throughput, durable, ordered, and scalable messaging solution. The Redis Stream structure is an enhancement to the traditional publish/subscribe model, allowing you to process data streams more flexibly and providing the following main features:
-
Multiple producers and multiple consumers: Multiple producers can write messages to the Stream at the same time, and multiple consumers can independently subscribe and consume messages. Each consumer can have a different consumption rate.
-
Consumer group: Redis Stream introduces the concept of consumer group. Multiple consumers can join the same consumer group and consume messages together. This ensures that messages will not be processed multiple times during consumption.
-
Consumer blocking: Consumers can use the XREADGROUP command to obtain messages in a blocking manner. New messages will only be pushed to the consumer when they arrive.
-
Consumer automatic confirmation: Redis Stream supports automatic confirmation of messages. Consumers can tell Redis when to confirm that a message has been successfully processed.
-
Multiple Stream support: You can create multiple Streams to store different kinds of data and process them separately.
-
Orderliness: Messages are stored in the Stream in order according to the timestamp of the message, so you can read the data in the order of the message.
-
Persistent storage: Redis Stream uses in-memory data structures but also supports asynchronous saving of data to disk to ensure data is not lost.
2. Comparison
Compared with other implementations of redis, it has more comprehensive functions. It supports persistence and ack confirmation, which basically solves the problem of message loss. Of course, it is still somewhat insufficient compared to professional message queue middleware.
If you need to see a detailed comparison, you can read this article Redis Queue Comparison
2. Integrate spring’s data-redis implementation
1. Use dependencies
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies>
2. Configuration class
2.1. Configure RedisTemplate bean
The key point is the following sentence. You cannot use the json serialization class, otherwise the serialization will fail.
redisTemplate.setHashValueSerializer(RedisSerializer.string());
@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {<!-- --> RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(connectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); // JSON serialization cannot be used in this place. If ObjectRecord is used to transmit objects, there may be problems and a java.lang.IllegalArgumentException: Value must not be null! error will appear. redisTemplate.setHashValueSerializer(RedisSerializer.string()); return redisTemplate; }
2.2, Exception class
@Slf4j public class CustomErrorHandler implements ErrorHandler {<!-- --> @Override public void handleError(Throwable throwable) {<!-- --> log.error("An exception occurred",throwable); } }
3. Entity class
Two entity classes are used here, mainly for testing. If the specified type is not the same type, but the type of the parent class is specified, whether the message can be deserialized and received normally.
3.1、User
@Data @NoArgsConstructor @AllArgsConstructor @ToString public class Book extends User{<!-- --> private String title; private String author; }
3.2, Book
@Data @NoArgsConstructor @AllArgsConstructor @ToString public class User implements Serializable {<!-- --> private String name; private Integer age; }
4. Send message
Sending messages is mainly sent to redis through redisTemplate.opsForStream().add(record);
(it will be received in two ways, see the follow-up!)
4.1, RedisStreamUtil tool class
Used to implement methods such as message sending, initialization group, key binding group, clearing consumed messages, etc.
- When sending a message for the first time, you need to bind the receiving group and key first, otherwise an exception that the group does not exist will be reported when receiving
- After sending a message, the consumed message needs to be cleared, otherwise it will remain in the stream.
@Component @Slf4j public class RedisStreamUtil {<!-- --> public static final String STREAM_KEY_001 = "stream-001"; @Resource private RedisTemplate<String,Object> redisTemplate; /** * Add records to the stream * @param streamKey * @param t * @param <T> */ public <T> RecordId add(String streamKey,T t){<!-- --> ObjectRecord<String, T> record = StreamRecords.newRecord() .in(streamKey) //key .ofObject(t) //Message data .withId(RecordId.autoGenerate()); // Send a message RecordId recordId = redisTemplate.opsForStream().add(record); log.info("Added successfully, returned record-id[{}]",recordId); return recordId; } /** * Used to create binding streams and groups */ public void addGroup(String key, String groupName){<!-- --> redisTemplate.opsForStream().createGroup(key,groupName); } /** * Used to determine whether the key exists */ public boolean hasKey(String key){<!-- --> if(key==null){<!-- --> return false; }else{<!-- --> return redisTemplate.hasKey(key); } } /** * Used to delete consumed messages */ public void delField(String key,RecordId recordIds){<!-- --> redisTemplate.opsForStream().delete(key,recordIds); } /** * Used to initialize and implement binding keys and consumer groups */ public void initStream(String key, String group){<!-- --> //Determine whether the key exists, if not, create it boolean hasKey = hasKey(key); if(!hasKey){<!-- --> Map<String,Object> map = new HashMap<>(); map.put("key","value"); RecordId recordId = add(key, map); addGroup(key,group); //You need to bind Stream and group during first initialization. delField(key,recordId); //Clear the useless data log.info("stream:{}-group:{} initialize success",key,group); } } public String getStreamKey001(){<!-- --> return STREAM_KEY_001; } }
4.2. Simulate sending messages through the delay queue thread pool
- In this method, after a delay of 5 seconds through simulation, a piece of data is sent every 3 seconds, and the thread pool is closed after sending 10 pieces.
/** * Executed during spring initialization, sending messages to the stream regularly, used to simulate sending messages */ //@Component public class StreamMessageRunner implements ApplicationRunner {<!-- --> @Resource private RedisStreamUtil redisStreamUtil; @Override public void run(ApplicationArguments args) throws Exception {<!-- --> ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); AtomicInteger integer = new AtomicInteger(0); //Use the delay queue thread pool to simulate sending data messages pool.scheduleAtFixedRate(()->{<!-- --> User zhangsan = new User("zhangsan" + integer.get(), 1 + integer.get()); RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), zhangsan); integer.getAndIncrement(); //Consumed messages need to be cleared, otherwise they will remain in the stream and be consumed repeatedly. redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId); if (integer.get()>10){<!-- --> System.out.println("---------Exit sending message--------"); pool.shutdown(); } },5,3, TimeUnit.SECONDS); } }
4.3. Actively send messages through http
- See different effects by sending parent and child categories separately for comparison.
@RestController @RequestMapping("/index") public class index {<!-- --> @Resource private RedisStreamUtil redisStreamUtil; /** * father */ @GetMapping("/login") public String login(User user){<!-- --> RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), user); redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId); return "Success!"; } /** * Subclass */ @GetMapping("/login2") public String login(Book book){<!-- --> RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), book); redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId); return "Success!"; } }
5. Message reception
5.1. Not bound to consumer groups-broadcast effects can be achieved
The node consumer does not bind the consumer group, but binds it directly to the stream to achieve the broadcast effect. Every time a message is sent to the stream of the specified node, it can be received.
As shown below: A message is sent to redis Stream, and all A0 to B2 bound in it can receive this message
Method 1: Active reading
Actively read messages from the stream through the redisTemplate.opsForStream().read()
method
/** * Independent consumer---can read all messages of the key */ @Component @Slf4j public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {<!-- --> private ThreadPoolExecutor threadPoolExecutor; @Resource private RedisTemplate<String,Object> redisTemplate; private volatile boolean stop = false; /** * Executed when initializing the bean, and actively read messages from the specified key of the stream in a polling manner. * @throwsException */ @Override public void afterPropertiesSet() throws Exception {<!-- --> //Initialize thread pool threadPoolExecutor = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), r -> {<!-- --> Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("xread-nonblock-01"); return thread; }); StreamReadOptions options = StreamReadOptions.empty() // If there is no data, block for 1s, and the blocking time needs to be less than ·spring.redis.timeout· .block(Duration.ofMillis(1000)) // Block until data is obtained, a timeout exception may be reported // .block(Duration.ofMillis(0)) // Get 10 pieces of data at a time .count(10); StringBuilder readBuilder = new StringBuilder("0-0"); threadPoolExecutor.execute(()->{<!-- --> while (!stop){<!-- --> //Actively read from the redis stream. The options are set to block for one second each time it is read. List<ObjectRecord<String, User>> objectRecords = redisTemplate.opsForStream() .read(User.class, options, StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.from(readBuilder.toString()))); if (CollectionUtils.isEmpty(objectRecords)){<!-- --> log.warn("No data read"); continue; } objectRecords.stream().forEach(objectRecord->{<!-- --> log.info("obtained data information id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue()); readBuilder.setLength(0); readBuilder.append(objectRecord.getId()); }); } }); } /** * Close the thread pool when destroying the bean * @throwsException */ @Override public void destroy() throws Exception {<!-- --> stop = true; threadPoolExecutor.shutdown(); threadPoolExecutor.awaitTermination(3,TimeUnit.SECONDS); } }
Test log
Method 2: Use the listener to monitor whether there are new messages
The specific code is the same as the grouping, except that the group is not specified, so it is merged and written below.
Mainly implemented through the listener class StreamMessageListenerContainer
.
Mainly through the following sentence:
container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("Independent consumption", null, null));
5.2. Designate consumer groups—-enable only one member in a group to receive messages
After grouping, only one member of a group can read the message, as shown in the figure below. Of course, multiple groups can also be bound during use, and each group receives messages that it does not listen to. The following method is equivalent to mq, the relationship between switches, queues and routing keys
5.2.1, Configuration class
The specific process of the following code first creates a thread pool; then configures the message listening container, and finally puts the listener for receiving messages into the listening container, and finally injects the listening container into the bean.
@Configuration public class RedisStreamConfiguration {<!-- --> @Resource private RedisStreamUtil redisStreamUtil; @Resource private RedisConnectionFactory redisConnectionFactory; @Bean(initMethod = "start",destroyMethod = "stop") public StreamMessageListenerContainer<String, ObjectRecord<String,User>> streamMessageListenerContainer(){<!-- --> AtomicInteger index = new AtomicInteger(1); // Get the number of native threads int processors = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50) , (r) -> {<!-- --> Thread thread = new Thread(r); thread.setName("async-stream-consumer-" + index.getAndIncrement()); thread.setDaemon(true); return thread; }, new ThreadPoolExecutor.CallerRunsPolicy()); // The message listening container cannot be implemented externally. Once created, the StreamMessageListenerContainer can subscribe to the Redis stream and consume incoming messages StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,ObjectRecord<String,User>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() //How many messages can be obtained at most at one time? .batchSize(10) // Run the poll task of Stream .executor(executor) // When there is no message in the Stream, the blocking time needs to be shorter than `spring.redis.timeout` .pollTimeout(Duration.ofSeconds(1)) // When ObjectRecord is used, convert the object's filed and value into a Map. For example: convert the Book object into a map. // .objectMapper(new ObjectHashMapper()) // During the process of obtaining the message or the process of obtaining the message for processing by the specific message sender, an exception occurred. .errorHandler(new CustomErrorHandler()) //Convert the Record sent to the Stream into an ObjectRecord. The specific type converted into is the type specified here. .targetType(User.class) .build(); StreamMessageListenerContainer<String, ObjectRecord<String, User>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options); //Initialization-bind key and consumer group redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a"); // Do not bind consumer groups, consume independently container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("Independent consumption", null, null)); //Consumer group A, no automatic ack //Start consuming messages that are not assigned to consumers in the consumer group // container.receive(Consumer.from("group-a","consumer-a"), // StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()), new MonitorStreamListener("Consumer Group A","group-a", "consumer-a")); // automatic ack container.receiveAutoAck(Consumer.from("group-a","consumer-b"), StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("Consumer Group B","group-a", "consumer-b")); return container; } }
Important code analysis:
.targetType(User.class)
: Used to specify the type when configuring the listening container. If not specified, the default is string type. If you pass in a string other than the string machine, you need to specify it; if you configure a parent Classes can also receive messages from subclasses and convert them. But if it is the configured Object type, it will be a path when received, and the incoming object cannot be obtained normally (I don’t know why, someone with research and understanding can answer it)redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a")
: When generating for the first time, the consumer group needs to be bound to the key of the stream, otherwise an error will be reported. The specific internal execution logic You can see the initStream() method (or manually bind it to redis through the command:xgroup create stream-001 group-a $
) stream-001 (key) group-a (consumer group)container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("Independent consumption", null, null))
: This sentence does not bind the consumer group, which is the way of broadcasting Monitor all messages in the key (The difference from the above is that this method passively monitors messages)container.receiveAutoAck(Consumer.from("group-a","consumer-b") ,StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener(" Consumer group B","group-a", "consumer-b"))
:It is through this code that group monitoring messages are implemented, and the consumer group and consumption are bound The name of the listener, and the listener class. Then use the automatic ack method to reply to the stream to confirm that the message has been received (or return the message to the stream through manual ack method, otherwise it will be sent repeatedly),
5.2.2, Listener
Used to receive messages and then implement specific business codes
@Slf4j public class MonitorStreamListener <T> implements StreamListener<String, ObjectRecord<String,T>> {<!-- --> /** *Consumer type: independent consumption, consumption group consumption */ private String consumerType; /** *Consumer group */ private String group; /** * A consumer in the consumer group */ private String consumerName; public MonitorStreamListener(String consumerType, String group, String consumerName) {<!-- --> this.consumerType = consumerType; this.group = group; this.consumerName = consumerName; } @Override public void onMessage(ObjectRecord<String, T> message) {<!-- --> log.info("Received message from redis"); String stream = message.getStream(); RecordId id = message.getId(); User value = (User) message.getValue(); value.getName(); //Execute specific business logic for receiving messages if (StringUtils.isEmpty(group)) {<!-- --> log.info("[{}]: Received a message stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value); } else {<!-- --> log.info("[{}] group:[{}] consumerName:[{}] received a message stream:[{}],id:[{}],value:[{}]", consumerType , group, consumerName, stream, id, value); } // When consuming in a consumer group, if it is not an automatic ack, you need to manually ack here. // redisTemplate.opsForStream() // .acknowledge("key","group","recordId"); } }
Send messages through delay queue-test results:
Send User to subclass Book object data test results via http
Result: It can also be accepted normally
3. Complete code
Full code warehouse address
4. Quote
https://juejin.cn/post/7029302992364896270#heading-0
https://juejin.cn/post/6844904125822435341? searchId=202310141054532F9807A1000F6680C0DF#heading-1