Use the Redis publish and subscribe function to implement dynamic registration monitoring and asynchronous retrieval of results

Introduction and objectives

Publish/subscribe is very commonly used in our system development. Redis also provides this function, but it implements relatively basic functions. Redis cannot achieve these functions of reliable message publishing, subscription and message accumulation. But we can still use it to handle many business scenarios, such as:

Our system uses services to call certain requests, but the called services cannot respond in time. When the results need to be returned asynchronously, if we use synchronous blocking, it will waste resources and reduce the throughput of the system. For some time-consuming operations, , because long waiting can easily lead to request timeout processing failure (actually it may have been processed successfully).

For the above scenario, we can consider using the publish and subscribe function of redis to handle it.


The basic idea is:

Service A first dynamically subscribes to topic 1, then calls service B and brings the address of topic 1, and then waits for topic 1 to return a message. The topic subscribed by service A sets a timeout mechanism to prevent the called service from being unresponsive for a long time and causing the business to freeze.

Service B processes the message asynchronously after receiving the request, and notifies Service A through topic 1 after processing the business.

After service A receives the message sent by service B, it can wake up the waiting thread to handle the next business. This effectively achieves application decoupling and improves the throughput of our system. The flow chart is as follows:

Core code


This is the bean required to inject redis publish and subscribe into the spring container.

public class RedisChannelListenerConfig {

    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        return container;



How to encapsulate publish and subscribe

public class RedisChannelService {

     * separator
    private static final String PARTITION = "_";

    private RedisMessageListenerContainer container;
    private RedisTemplate<String, Object> redisTemplate;

     * make an announcement
     * @param channel
     * @param message
    public void publish(String channel, Object message) {
        try {
            redisTemplate.convertAndSend(channel, message);
        } catch (Exception e) {

     * Add subscription channel (single channel)
     * @param listener listener
     * @param channel
    public void addSubscribe(MessageListener listener, String channel) {
        container.addMessageListener(listener, new ChannelTopic(channel));

     * Pattern subscription (supports wildcards)
     * @param listener
     * @param channel
    public void addPSubscribe(MessageListener listener, String channel) {
        container.addMessageListener(listener, new PatternTopic(channel));

     * Pattern subscription (supports wildcards)
     * @param listener
     * @param channels Multiple wildcard themes
    public void addPSubscribe(MessageListener listener, List<String> channels) {
        List<PatternTopic> list = new ArrayList<>();
        for (String channel : channels) {
            list.add(new PatternTopic(channel));
        container.addMessageListener(listener, list);

     * Remove subscription
     * @param listener
    public void removeSubscribe(MessageListener listener) {

     * Format message
     * @param isSuccess
     * @param message
     * @return
    public String formatMessage(Boolean isSuccess, String message){
        return new StringBuilder().append(isSuccess.toString()).append(PARTITION).append(message).toString();

     * Get message value
     * @param message
     * @return
    public RedisChannelMessage getMessage(String message){
        if(message == null || message.indexOf(PARTITION) == -1){
            return null;
        String[] split = message.split(PARTITION);
        return new RedisChannelMessage(Boolean.valueOf(split[0]), split[1]);


public class RedisChannelMessage {
    private boolean isSuccess;
    private String message;


Customize the listener, set the topic and get the return value

 * Common class for channel subscription mode
 * @author: yan
 * @date: 2023/04/27
public class RedisChannelListener<T> implements Consumer<T> {

    private String channel;
    private volatile T value;
    private Thread thread;

    public RedisChannelListener(String channel, Thread thread) { = channel;
        this.thread = thread;

    public void accept(T t) {
        this.value = t;

    public String getChannel() {
        return channel;

    public T getValue() throws TimeoutException {
        return getValue(10, TimeUnit.SECONDS);

    public Thread getThread() {
        return this.thread;

    public T getValue(long time, TimeUnit timeUnit) throws TimeoutException {
        long end = System.nanoTime() + timeUnit.toNanos(time);
        while (System.nanoTime() < end) {
            if (value != null) {
                return value;
            LockSupport.parkNanos(1000 * 1000 * 1000);
        throw new TimeoutException();

The above is the encapsulation of the publish-subscribe model.


The above configuration will not be injected into the container by default and can be initialized according to project needs.


Initialization class

public class RedisChannelConfig {
    private RedisChannelService redisChannelService;
    private static Cache<String, RedisChannelListener<String>> listenerMap = CacheBuilder.newBuilder()
            //Automatically expires in 300 seconds
            .expireAfterWrite(300, TimeUnit.SECONDS)

    public void init(){
        // Enable global subscription
        final MessageListener messageListener = (message, bytes) -> {
            String channel = new String(message.getChannel());
            RedisChannelListener<String> listener = listenerMap.getIfPresent(channel);
            if (listener != null) {
        // Limit unified subscription here
        ArrayList<String> channels = Lists.newArrayList("test_*");
        redisChannelService.addPSubscribe(messageListener, channels);

    public void registerListener(RedisChannelListener listener) {
        listenerMap.put(listener.getChannel(), listener);

    public void removeListener(RedisChannelListener listener) {


Usage method, simulate calling remote service

 * @author yan
 * @date 2023-10-23
public class RedisChannelTest {
    private RedisChannelConfig redisChannelConfig;
    private RedisChannelRemote redisChannelRemote;

    public boolean sendTest() throws TimeoutException {
        // Subscribe to the test_001 message first
        String channel = "test_001";
        RedisChannelListener<String> listener = new RedisChannelListener<>(channel, Thread.currentThread());

        // Call external service
        String handle = redisChannelRemote.handle("test,test", channel);

        // Get results
        String value = listener.getValue();
        return true;


Remote Service

 * @author yan
 * @date 2023-10-23
public class RedisChannelRemote {
    private RedisChannelService redisChannelService;

    public String handle(String msg, String topic){
        new Thread(() -> {
            try {
            } catch (InterruptedException e) {
            redisChannelService.publish(topic, "back:" + msg);
        return "success";


Finally write a controller and try it out

@RequestMapping(value = "redis")
public class RedisController {

    private RedisChannelTest redisChannelTest;

    public String channel() throws TimeoutException {
        if (redisChannelTest.sendTest()) {
            return "success";
        return "fail";



The above is to use the Redis publish and subscribe function to realize dynamic registration, monitoring and asynchronous acquisition of results.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. MySQL entry-level skills treeDatabase compositionTable 76590 people are learning the system