Use the Redis publish-subscribe model to implement session sharing

In fact, it is not to realize session sharing, but to use redis publish and subscribe, so that all cluster servers can send messages to their own sessions. For example, if userId is on the 35th server and there are 100 servers, then the first server receives the message and needs to notify userId. Instead of finding the 35th server, it notifies all servers and sends a message to userId. The other 99 If the server does not have a userId, then the sending will not be successful!

1. Configure redis

package com.kakarote.crm.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kakarote.crm.constant.RedisConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

@Configuration
public class CrmTemplateConfig {<!-- -->

    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.port}")
    private int redisPort;

    @Value("${spring.redis.password}")
    private String redisHasrdpwd;

    @Value("${spring.redis.database}")
    private Integer database;

    @Bean(name = "crmRedisTemplate")
    public RedisTemplate redisTemplate() {<!-- -->
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory(database, redisHost, redisPort, redisHasrdpwd));
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }

    public RedisConnectionFactory connectionFactory(int database, String hostName, int port, String password) {<!-- -->

        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(hostName);
        configuration.setPort(port);
        if (StringUtils.isNotBlank(password)) {<!-- -->
            configuration.setPassword(password);
        }
        if (database != 0) {<!-- -->
            configuration.setDatabase(database);
        }

        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(10);
        genericObjectPoolConfig.setMinIdle(10);
        genericObjectPoolConfig.setMaxTotal(100);
        genericObjectPoolConfig.setMaxWaitMillis(3000);

        LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                .commandTimeout(Duration.ofMillis(8000))
                .poolConfig(genericObjectPoolConfig)
                .build();

        LettuceConnectionFactory lettuce = new LettuceConnectionFactory(configuration, clientConfig);
        lettuce.afterPropertiesSet();
        return lettuce;
    }

    /**
     * Redis message listener container
     * This container loads RedisConnectionFactory and message listener
     * You can add multiple redis listeners that listen to different topics. You only need to bind the message listener to the corresponding message subscription processor. The message listener
     * Call the relevant methods of the message subscription processor through reflection technology to perform some business processing
     *
     * @return redis message listening container
     */
    @Bean
    @SuppressWarnings("all")
    public RedisMessageListenerContainer container(
            RedisMessageListener listener) {<!-- -->


        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        //Listen to the key expiration events of all libraries
        container.setConnectionFactory(connectionFactory(database, redisHost, redisPort, redisHasrdpwd));
        // All subscription messages need to be registered and bound here. new PatternTopic(TOPIC_NAME1) represents the published topic information.
        //You can add multiple messageListeners and configure different channels
        container.addMessageListener(listener, new PatternTopic(RedisConstants.WEBSOCKET_REDIS_TOPIC));

        /**
         * Set serialization object
         * Special note: 1. Serialization needs to be set when publishing; subscribers also need to set serialization
         * 2. Setting the serialization object must be placed after the step [Add message listener], otherwise the receiver will not receive the message.
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);

        return container;
    }
}

2. Configure RedisMessageListener

package com.kakarote.crm.config;

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.kakarote.crm.constant.CrmConst;
import com.kakarote.crm.entity.BO.MessageDto;
import com.kakarote.crm.websocket.TransferCallWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RedisMessageListener implements MessageListener {<!-- -->

    @Autowired
    private RedisTemplate<String, Object> crmRedisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {<!-- -->

        //received topic
        log.info("RedisMessageListener-received message 1, channel:" + new String(pattern));
        try {<!-- -->
            //Serialized object (Special note: serialization needs to be set when publishing; subscribers also need to set serialization)
            MessageDto messageDto = (MessageDto) crmRedisTemplate.getValueSerializer().deserialize(message.getBody());
            log.info("RedisMessageListener-received message 2, channel = {}, messageDto = {}", new String(pattern), messageDto);
            if(messageDto == null){<!-- -->
                log.info("RedisMessageListener-messageDto = null, no message to send! message = {}", JSONUtil.toJsonStr(message));
                return;
            }
            if(CrmConst.NOTICE_MSG.equals(messageDto.getTitle())){<!-- -->
                JSONObject content = messageDto.getContent();
                String toUserId = content.getString("toUserId");
                String fromUserId = content.getString("fromUserId");
                JSONObject msg = content.getJSONObject("msg");

                String resp = TransferCallWebSocket.sendMsgByUserId(fromUserId, toUserId, JSONUtil.toJsonStr(msg));
                if(!resp.equals("success")){<!-- -->
                    log.info("RedisMessageListener-send pop-up message, resp = {}, content = {}", resp, content);
                }
            }

        }catch (Exception e){<!-- -->
            log.info("RedisMessageListener-listening message processing failed, failure reason = {}, e = ", e.getMessage(), e);
        }
    }
}


3. Static class

/**
 * @description: constant class
 * @dateTime: 2021/6/17 16:21
 */
public class RedisConstants {<!-- -->

    /**
     * UTF-8 character set
     */
    public static final String UTF8 = "UTF-8";

    public final static String WEBSOCKET_REDIS_TOPIC = "websocket_topic";
 public static final String TRANSFER_NOTICE = "transferCallNotice";
    public static final String NOTICE_MSG = "noticeMessage";
}

4. Message body

package com.kakarote.crm.entity.BO;

import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageDto implements Serializable {<!-- -->
    private String data;
    private String title;
    private JSONObject content;
}

5. Business class sends messages through channels

 /**
     * Publish a message to the channel
     */
    public boolean convertAndSend(String channel, Object message) {<!-- -->
        if (StringUtil.isBlank(channel)) {<!-- -->
            return false;
        }
        try {<!-- -->
            crmRedisTemplate.convertAndSend(channel, message);
            log.info("Message sent successfully, channel: {}, message: {}", channel, message);
            return true;
        } catch (Exception e) {<!-- -->
            log.info("Failed to send message, channel: {}, message: {}, failure reason = {}, e = ", channel, message, e.getMessage(), e);
            e.printStackTrace();
        }
        return false;
    }

6. websocket configuration

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class WebSocketConfiguration implements ServletContextInitializer {<!-- -->

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {<!-- -->
        return new ServerEndpointExporter();
    }

    @Bean
    public TaskScheduler taskScheduler(){<!-- -->
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }

@Override
public void onStartup(ServletContext servletContext) throws ServletException {<!-- -->
servletContext.addListener(WebAppRootListener.class);
        servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","52428800");
        servletContext.setInitParameter("org.apache.tomcat.websocket.binaryBufferSize","52428800");
}
}

7. websocket Controller class

@ServerEndpoint("/crmDzhWebsocket/transferWebsocket/{userId}")
@Component
@Slf4j
public class TransferCallWebSocket {<!-- -->


    /**
     * Current number of online connections
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /**
     * Used to store the WebSocketServer object corresponding to each client
     */
    private static final ConcurrentHashMap<String, Session> webSocketMap = new ConcurrentHashMap<>();

    /**
     * A connection session with a client through which data is sent to the client
     */
    private Session session;

    /**
     * Receive userId
     */
    private String userIdKey = "";


    /**
     * Method called when the connection is established successfully
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {<!-- -->
        this.session = session;
        this.userIdKey = userId;
        if (webSocketMap.containsKey(userId)) {<!-- -->
            webSocketMap.remove(userId);
            webSocketMap.put(userId, session);
        } else {<!-- -->
            webSocketMap.put(userId, session);
            addOnlineCount();
        }
        log.info("Transfer notification user connection:" + userId + ", the current total number of people online is: " + getOnlineCount());
        try {<!-- -->
            sendMessage("success");
        } catch (IOException e) {<!-- -->
            log.error("Transfer notification user:" + userId + ", network abnormality!!!!!!");
            log.info("Transfer notification to user connection:" + userId + ", network abnormality!!!!!!");
        }
    }

    /**
     * Method called when the connection is closed
     */
    @OnClose
    public void onClose() {<!-- -->
        if (webSocketMap.containsKey(userIdKey)) {<!-- -->
            webSocketMap.remove(userIdKey);
            subOnlineCount();
        }
        log.info("Transfer notification to user to exit:" + userIdKey + ", the current total number of people online is: " + getOnlineCount());
    }

    /**
     * Method called after receiving client message
     *
     * @param message The message sent by the client
     */
    @OnMessage
    public void onMessage(String message, Session session) {<!-- -->
        try {<!-- -->
            if ("ping".equals(message)) {<!-- -->
                webSocketMap.get(this.userIdKey).getBasicRemote().sendText("pong");
                return;
            }
            log.info("this.userIdKey = {}, message = {}", this.userIdKey, message);
        } catch (IOException e) {<!-- -->
            log.error("Transfer notification failed to send message, failure reason = {}, e = ", e.getMessage(), e);
            e.printStackTrace();
        }
    }

    public static String sendMsgByUserId(String fromUserId, String toUserId, String msg) throws IOException {<!-- -->
        if(webSocketMap.get(toUserId) != null){<!-- -->
            try {<!-- -->
                webSocketMap.get(toUserId).getBasicRemote().sendText(msg);
                return "success";
            }catch (Exception e){<!-- -->
                log.error("Failed to send message, fromUserId = {}, toUserId = {}", fromUserId, toUserId);
                return e.getMessage();
            }
        }
        return "userId:" + toUserId + "Currently not in session";
    }

    /**
     * Called when an error occurs
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {<!-- -->
        log.info("User error:" + session.getId() + ",reason:" + error.getMessage());
    }

    /**
     * Implement server active push
     */
    public void sendMessage(String message) throws IOException {<!-- -->
        this.session.getBasicRemote().sendText(message);
    }

    public static synchronized AtomicInteger getOnlineCount() {<!-- -->
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {<!-- -->
        TransferCallWebSocket.onlineCount.getAndIncrement();
    }

    public static synchronized void subOnlineCount() {<!-- -->
        TransferCallWebSocket.onlineCount.getAndDecrement();
    }


}