Use the Redis publish and subscribe function to implement dynamic registration monitoring and asynchronous retrieval of results

Introduction and objectives

Publish/subscribe is very commonly used in our system development. Redis also provides this function, but it implements relatively basic functions. Redis cannot achieve these functions of reliable message publishing, subscription and message accumulation. But we can still use it to handle many business scenarios, such as:

Our system uses services to call certain requests, but the called services cannot respond in time. When the results need to be returned asynchronously, if we use synchronous blocking, it will waste resources and reduce the throughput of the system. For some time-consuming operations, , because long waiting can easily lead to request timeout processing failure (actually it may have been processed successfully).

For the above scenario, we can consider using the publish and subscribe function of redis to handle it.

Ideas

The basic idea is:

Service A first dynamically subscribes to topic 1, then calls service B and brings the address of topic 1, and then waits for topic 1 to return a message. The topic subscribed by service A sets a timeout mechanism to prevent the called service from being unresponsive for a long time and causing the business to freeze.

Service B processes the message asynchronously after receiving the request, and notifies Service A through topic 1 after processing the business.

After service A receives the message sent by service B, it can wake up the waiting thread to handle the next business. This effectively achieves application decoupling and improves the throughput of our system. The flow chart is as follows:

Core code

RedisChannelListenerConfig

This is the bean required to inject redis publish and subscribe into the spring container.

public class RedisChannelListenerConfig {

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        return container;
    }

}

RedisChannelService

How to encapsulate publish and subscribe

@Import(RedisChannelListenerConfig.class)
public class RedisChannelService {

    /**
     * separator
     */
    private static final String PARTITION = "_";

    @Resource
    private RedisMessageListenerContainer container;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * make an announcement
     * @param channel
     * @param message
     */
    public void publish(String channel, Object message) {
        try {
            redisTemplate.convertAndSend(channel, message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Add subscription channel (single channel)
     *
     * @param listener listener
     * @param channel
     */
    public void addSubscribe(MessageListener listener, String channel) {
        container.addMessageListener(listener, new ChannelTopic(channel));
    }

    /**
     * Pattern subscription (supports wildcards)
     *
     * @param listener
     * @param channel
     */
    public void addPSubscribe(MessageListener listener, String channel) {
        container.addMessageListener(listener, new PatternTopic(channel));
    }

    /**
     * Pattern subscription (supports wildcards)
     *
     * @param listener
     * @param channels Multiple wildcard themes
     */
    public void addPSubscribe(MessageListener listener, List<String> channels) {
        List<PatternTopic> list = new ArrayList<>();
        for (String channel : channels) {
            list.add(new PatternTopic(channel));
        }
        container.addMessageListener(listener, list);
    }

    /**
     * Remove subscription
     *
     * @param listener
     */
    public void removeSubscribe(MessageListener listener) {
        container.removeMessageListener(listener);
    }

    /**
     * Format message
     * @param isSuccess
     * @param message
     * @return
     */
    public String formatMessage(Boolean isSuccess, String message){
        return new StringBuilder().append(isSuccess.toString()).append(PARTITION).append(message).toString();
    }

    /**
     * Get message value
     * @param message
     * @return
     */
    public RedisChannelMessage getMessage(String message){
        if(message == null || message.indexOf(PARTITION) == -1){
            return null;
        }
        String[] split = message.split(PARTITION);
        return new RedisChannelMessage(Boolean.valueOf(split[0]), split[1]);
    }
}

RedisChannelMessage

@Data
@AllArgsConstructor
public class RedisChannelMessage {
    private boolean isSuccess;
    private String message;
}

RedisChannelListener

Customize the listener, set the topic and get the return value

/**
 * Common class for channel subscription mode
 *
 * @author: yan
 * @date: 2023/04/27
 */
public class RedisChannelListener<T> implements Consumer<T> {

    private String channel;
    private volatile T value;
    private Thread thread;

    public RedisChannelListener(String channel, Thread thread) {
        this.channel = channel;
        this.thread = thread;
    }

    @Override
    public void accept(T t) {
        this.value = t;
    }

    public String getChannel() {
        return channel;
    }

    public T getValue() throws TimeoutException {
        return getValue(10, TimeUnit.SECONDS);
    }

    public Thread getThread() {
        return this.thread;
    }

    public T getValue(long time, TimeUnit timeUnit) throws TimeoutException {
        long end = System.nanoTime() + timeUnit.toNanos(time);
        while (System.nanoTime() < end) {
            if (value != null) {
                return value;
            }
            LockSupport.parkNanos(1000 * 1000 * 1000);
        }
        throw new TimeoutException();
    }
}

The above is the encapsulation of the publish-subscribe model.

Use

The above configuration will not be injected into the container by default and can be initialized according to project needs.

RedisChannelConfig

Initialization class

@Slf4j
@Component
@Import(RedisChannelService.class)
public class RedisChannelConfig {
    @Resource
    private RedisChannelService redisChannelService;
    private static Cache<String, RedisChannelListener<String>> listenerMap = CacheBuilder.newBuilder()
            //Automatically expires in 300 seconds
            .expireAfterWrite(300, TimeUnit.SECONDS)
            .build();


    @PostConstruct
    public void init(){
        // Enable global subscription
        final MessageListener messageListener = (message, bytes) -> {
            String channel = new String(message.getChannel());
            System.out.println(channel);
            RedisChannelListener<String> listener = listenerMap.getIfPresent(channel);
            if (listener != null) {
                listener.accept(message.toString());
                listenerMap.invalidate(channel);
            }
        };
        // Limit unified subscription here
        ArrayList<String> channels = Lists.newArrayList("test_*");
        redisChannelService.addPSubscribe(messageListener, channels);
    }


    public void registerListener(RedisChannelListener listener) {
        listenerMap.put(listener.getChannel(), listener);
    }

    public void removeListener(RedisChannelListener listener) {
        listenerMap.invalidate(listener.getChannel());
    }
}

RedisChannelTest

Usage method, simulate calling remote service

/**
 * @author yan
 * @date 2023-10-23
 */
@Service
public class RedisChannelTest {
    @Resource
    private RedisChannelConfig redisChannelConfig;
    @Resource
    private RedisChannelRemote redisChannelRemote;

    public boolean sendTest() throws TimeoutException {
        // Subscribe to the test_001 message first
        String channel = "test_001";
        RedisChannelListener<String> listener = new RedisChannelListener<>(channel, Thread.currentThread());
        redisChannelConfig.registerListener(listener);

        // Call external service
        String handle = redisChannelRemote.handle("test,test", channel);
        System.out.println(handle);

        // Get results
        String value = listener.getValue();
        System.out.println(value);
        return true;
    }
}

RedisChannelRemote

Remote Service

/**
 * @author yan
 * @date 2023-10-23
 */
@Service
public class RedisChannelRemote {
    @Resource
    private RedisChannelService redisChannelService;

    public String handle(String msg, String topic){
        new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            redisChannelService.publish(topic, "back:" + msg);
        }).start();
        return "success";
    }
}

RedisController

Finally write a controller and try it out

@RestController
@RequestMapping(value = "redis")
public class RedisController {

    @Resource
    private RedisChannelTest redisChannelTest;


    @GetMapping("channel")
    public String channel() throws TimeoutException {
        if (redisChannelTest.sendTest()) {
            return "success";
        }
        return "fail";
    }

}

Summary

The above is to use the Redis publish and subscribe function to realize dynamic registration, monitoring and asynchronous acquisition of results.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. MySQL entry-level skills treeDatabase compositionTable 76590 people are learning the system