Introduction and objectives
Publish/subscribe is very commonly used in our system development. Redis also provides this function, but it implements relatively basic functions. Redis cannot achieve these functions of reliable message publishing, subscription and message accumulation. But we can still use it to handle many business scenarios, such as:
Our system uses services to call certain requests, but the called services cannot respond in time. When the results need to be returned asynchronously, if we use synchronous blocking, it will waste resources and reduce the throughput of the system. For some time-consuming operations, , because long waiting can easily lead to request timeout processing failure (actually it may have been processed successfully).
For the above scenario, we can consider using the publish and subscribe function of redis to handle it.
Ideas
The basic idea is:
Service A first dynamically subscribes to topic 1, then calls service B and brings the address of topic 1, and then waits for topic 1 to return a message. The topic subscribed by service A sets a timeout mechanism to prevent the called service from being unresponsive for a long time and causing the business to freeze.
Service B processes the message asynchronously after receiving the request, and notifies Service A through topic 1 after processing the business.
After service A receives the message sent by service B, it can wake up the waiting thread to handle the next business. This effectively achieves application decoupling and improves the throughput of our system. The flow chart is as follows:
Core code
RedisChannelListenerConfig
This is the bean required to inject redis publish and subscribe into the spring container.
public class RedisChannelListenerConfig { @Bean public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) { final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); return container; } }
RedisChannelService
How to encapsulate publish and subscribe
@Import(RedisChannelListenerConfig.class) public class RedisChannelService { /** * separator */ private static final String PARTITION = "_"; @Resource private RedisMessageListenerContainer container; @Resource private RedisTemplate<String, Object> redisTemplate; /** * make an announcement * @param channel * @param message */ public void publish(String channel, Object message) { try { redisTemplate.convertAndSend(channel, message); } catch (Exception e) { e.printStackTrace(); } } /** * Add subscription channel (single channel) * * @param listener listener * @param channel */ public void addSubscribe(MessageListener listener, String channel) { container.addMessageListener(listener, new ChannelTopic(channel)); } /** * Pattern subscription (supports wildcards) * * @param listener * @param channel */ public void addPSubscribe(MessageListener listener, String channel) { container.addMessageListener(listener, new PatternTopic(channel)); } /** * Pattern subscription (supports wildcards) * * @param listener * @param channels Multiple wildcard themes */ public void addPSubscribe(MessageListener listener, List<String> channels) { List<PatternTopic> list = new ArrayList<>(); for (String channel : channels) { list.add(new PatternTopic(channel)); } container.addMessageListener(listener, list); } /** * Remove subscription * * @param listener */ public void removeSubscribe(MessageListener listener) { container.removeMessageListener(listener); } /** * Format message * @param isSuccess * @param message * @return */ public String formatMessage(Boolean isSuccess, String message){ return new StringBuilder().append(isSuccess.toString()).append(PARTITION).append(message).toString(); } /** * Get message value * @param message * @return */ public RedisChannelMessage getMessage(String message){ if(message == null || message.indexOf(PARTITION) == -1){ return null; } String[] split = message.split(PARTITION); return new RedisChannelMessage(Boolean.valueOf(split[0]), split[1]); } }
RedisChannelMessage
@Data @AllArgsConstructor public class RedisChannelMessage { private boolean isSuccess; private String message; }
RedisChannelListener
Customize the listener, set the topic and get the return value
/** * Common class for channel subscription mode * * @author: yan * @date: 2023/04/27 */ public class RedisChannelListener<T> implements Consumer<T> { private String channel; private volatile T value; private Thread thread; public RedisChannelListener(String channel, Thread thread) { this.channel = channel; this.thread = thread; } @Override public void accept(T t) { this.value = t; } public String getChannel() { return channel; } public T getValue() throws TimeoutException { return getValue(10, TimeUnit.SECONDS); } public Thread getThread() { return this.thread; } public T getValue(long time, TimeUnit timeUnit) throws TimeoutException { long end = System.nanoTime() + timeUnit.toNanos(time); while (System.nanoTime() < end) { if (value != null) { return value; } LockSupport.parkNanos(1000 * 1000 * 1000); } throw new TimeoutException(); } }
The above is the encapsulation of the publish-subscribe model.
Use
The above configuration will not be injected into the container by default and can be initialized according to project needs.
RedisChannelConfig
Initialization class
@Slf4j @Component @Import(RedisChannelService.class) public class RedisChannelConfig { @Resource private RedisChannelService redisChannelService; private static Cache<String, RedisChannelListener<String>> listenerMap = CacheBuilder.newBuilder() //Automatically expires in 300 seconds .expireAfterWrite(300, TimeUnit.SECONDS) .build(); @PostConstruct public void init(){ // Enable global subscription final MessageListener messageListener = (message, bytes) -> { String channel = new String(message.getChannel()); System.out.println(channel); RedisChannelListener<String> listener = listenerMap.getIfPresent(channel); if (listener != null) { listener.accept(message.toString()); listenerMap.invalidate(channel); } }; // Limit unified subscription here ArrayList<String> channels = Lists.newArrayList("test_*"); redisChannelService.addPSubscribe(messageListener, channels); } public void registerListener(RedisChannelListener listener) { listenerMap.put(listener.getChannel(), listener); } public void removeListener(RedisChannelListener listener) { listenerMap.invalidate(listener.getChannel()); } }
RedisChannelTest
Usage method, simulate calling remote service
/** * @author yan * @date 2023-10-23 */ @Service public class RedisChannelTest { @Resource private RedisChannelConfig redisChannelConfig; @Resource private RedisChannelRemote redisChannelRemote; public boolean sendTest() throws TimeoutException { // Subscribe to the test_001 message first String channel = "test_001"; RedisChannelListener<String> listener = new RedisChannelListener<>(channel, Thread.currentThread()); redisChannelConfig.registerListener(listener); // Call external service String handle = redisChannelRemote.handle("test,test", channel); System.out.println(handle); // Get results String value = listener.getValue(); System.out.println(value); return true; } }
RedisChannelRemote
Remote Service
/** * @author yan * @date 2023-10-23 */ @Service public class RedisChannelRemote { @Resource private RedisChannelService redisChannelService; public String handle(String msg, String topic){ new Thread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } redisChannelService.publish(topic, "back:" + msg); }).start(); return "success"; } }
RedisController
Finally write a controller and try it out
@RestController @RequestMapping(value = "redis") public class RedisController { @Resource private RedisChannelTest redisChannelTest; @GetMapping("channel") public String channel() throws TimeoutException { if (redisChannelTest.sendTest()) { return "success"; } return "fail"; } }
Summary
The above is to use the Redis publish and subscribe function to realize dynamic registration, monitoring and asynchronous acquisition of results.
The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. MySQL entry-level skills treeDatabase compositionTable 76590 people are learning the system