WebSocket duplex communication enables users to log in and kick each other on different devices Server implementation

WebSocket duplex communication enables users to log in and kick each other on different devices. Server implementation

    • introduction
    • 1 Introduction to WebSocket
    • 2 Server implementation
      • 2.1 Add project dependencies
      • 2.2 Create new ServerEndpointExporter and RedissonClient configuration beans
      • 2.3 Create a new WebSocketServer component class
      • 2.4 When the same user logs in, add the logic of kicking out users logged in from other devices
      • 2.5 After the user successfully logs in, asynchronously judge whether the currently logged in user has multiple sessions, and if so, kick the previous session
      • 2.6 Startup class
      • 2.7 Project environment variables
    • 3 Summary
    • 4 Reference reading

Introduction

Recently, there is a need to control that a user can only log in on one device when logging in to the system. If a user has already logged in on one device, and then the same user continues to log in on another device, the session logged in on the previous device needs to be kicked out to ensure that a user has only one session at a time. The author researched feasible technical solutions on Nuggets, and found that there are mainly the following two implementation solutions:

1. The client polls the server to obtain the current login user information. The specific steps are as follows:

  • 1) After the user logs in successfully, save the userId and sessionId of the user in the localStorage of the browser (that is, the session ID. When the user logs in successfully in the background every time, a uuid is generated to represent the sessionId), and the server also saves this information at the same time. If the user When logging in on other devices, the sessionId is updated according to the userId;
  • 2) The client uses a timer to poll the server to obtain the latest login information of the current user according to the userId. If the obtained sessionId is found to be inconsistent with the sessionId saved in the local localStorage, it means that the user has logged in on another device, then Need to invalidate the current session and jump to the user login page

2. Through WebSocket technology, the specific steps are as follows:

  • 1) After the user successfully logs in to the server, a uuid is generated to represent the sessionId and returned to the client;
  • 2) The client establishes a WebSocket connection to the server after getting the sessionId returned by the server, and uses a HashMap data result to store the mapping relationship between sessionId and WebSocket, and uses the Redis distributed database to store the mapping relationship between userId and sessionId list;
  • 3) After the user successfully logs in on a device, first, according to the userId key, go to redis to check whether a sessionId already exists in the sessionId list corresponding to the current userId. If it exists, find the corresponding WebSocket session instance from the HashMap that stores the mapping relationship between sessionId and WebSocket according to the sessionId, and send a message to the client to notify that the current user has logged in on another device, and the current session is invalid;
  • 4) The client clears the variables saved in the browser’s local cache localStorage and session cache sessionStorage after receiving the notification that the server session has expired from the WebSocket, and then jumps to the user login page.

For the first solution, the client polls the server to obtain the sessionId of the currently logged-in user. A knowledgeable person can see at a glance that it consumes server resources and network bandwidth, and the time interval is set too long and cannot be perceived in real time. The current user has already logged in on another device, and the user will not often have this behavior of logging in on two devices at the same time. Obviously this scheme is not a very good solution.

For the second solution, the method of duplex communication through WebSocket is much superior. It does not require the client to poll the server to obtain the user’s sessionId, and when the user logs in on two devices at the same time, it actively pushes a message to the previous one. The logged-in client only needs to notify that the current session has expired. Then it is a matter of course that I choose WebScoket technology to realize this requirement. Because the author has never experienced WebSocket duplex communication before, this article will take you to use WebSocket + Redis technology to realize this challenging requirement.

Since the front-end and back-end functions to realize this requirement at the same time are too long, the author divides it into two articles and writes it. In this article, we focus on the realization of the server-side function. Implemented functions for effect testing.

1 Introduction to WebSocket

Here, we will briefly introduce what is WebSockt according to the chatGP answer by asking the recently popular chatGPT about WebSocket
webSocket introduction
translated as

WebSocket is a communication protocol that provides two-way, low-latency and real-time communication to the client and the server. It appears to overcome the traditional Http connection based on the request-response mode, which cannot provide persistent connections between the server and the client, etc. some flaws.

WebSocket has full-duplex communication capability, which means that both the server and the client can send messages to each other at any time without initiating a request to the other party. This is completely different from the traditional Http connection, which must initiate a request on the client side every time it obtains the response information from the server side.

WebSocket uses a separate TCP connection for communication, which ensures that the connection is always open when needed, which effectively reduces the service burden caused by multiple connections established and maintained for each request/response.

WebSocket is currently widely used in application scenarios such as online games, social chat, and real-time stock updates. Most modern web browsers already support WebSocket and can be used with front-end technologies such as HTML, JavaScript, and CSS.

2 server implementation

2.1 Add project dependencies

1. Create a new SpringBoot project and introduce the Maven dependencies of web, websocket and redisson in the pom file

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.bonus</groupId>
    <artifactId>bonus-backend</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>bonus-backend</name>
    <description>bonus-backend</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!--spring web mvc dependency-->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
        <!--spring security security framework-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>
        <!--spring configuration annotations automatically take effect dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--Ali fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.79</version>
        </dependency>
        <!--mysql driver-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
            <scope>runtime</scope>
        </dependency>
        <!--druid data source-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.8</version>
        </dependency>
        <!--mybatis-plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!--WebSocket dependency-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>
        <!--redis client upgrade tool redisson-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.10.7</version>
        </dependency>
        <!--Code concise tool lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.2.7.RELEASE</version>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2.2 Create new ServerEndpointExporter and RedissonClient configuration beans

package com.bonus.bonusbackend.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {<!-- -->
    @Value("${spring.redis.host}")
    private String redisHost;
    @Value("${spring.redis.port}")
    private String redisPort;

    /**
     * WebSocket service endpoint export bean
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){<!-- -->
        return new ServerEndpointExporter();
    }

    /**
     * Redisson client, stand-alone mode
     * @return RedissonClient instance
     */
    @Bean
    public RedissonClient redissonSingle(){<!-- -->
        Config config = new Config();
        config.setCodec(new JsonJacksonCodec())
                .useSingleServer()
                .setAddress("redis://127.0.0.1:6379");
        return Redisson.create(config);
    }

}

The ServerEndpointExporter class bean is used to expose the websocket service endpoint, and the RedissonClient class bean is a client tool used to operate redis.

2.3 Create a new WebSocketServer component class

Create a new WebSocketServer component class, and complete the onOpen session with the client websocket, receive the message onMessage, close the session onClose, and session error onError and other event monitoring methods

package com.bonus.bonusbackend.config;

import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

@Component
@ServerEndpoint(value = "/wsMessage")
public class WebSocketServer {<!-- -->
    // Store the currently connected client WebSocket session object
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    // The connection session with the client, through which to send data to the client
    private Session session;
    // current session id
    private String sessionId;
    // collection of online users
    private static List<String> memAccounts = new ArrayList<>();
    // log printer
    private static final Logger log = LoggerFactory. getLogger(WebSocketServer. class);
    
    /**
     * Called when the connection is opened
     * @param session session
     */
    @OnOpen
    public void onOpen(Session session) throws InterruptedException, IOException {<!-- -->
        log.info("Open webSocket session");
        this.session = session;
        log.info("queryString:{}", session.getQueryString());
        String queryStr = session. getQueryString();
        JSONObject queryJson = assembleQueryJson(queryStr);
        // Here, memAccount is required instead of userId due to project business
        String memAccount = queryJson. getString("memAccount");
        String sessionId = queryJson. getString("sessionId");
        this.memAccount = memAccount;
        this.sessionId = sessionId;
        // Check if session already exists
        if(webSocketMap. containsKey(sessionId)){<!-- -->
            // Already exists, first remove and then add
            webSocketMap. remove(sessionId);
            webSocketMap. put(sessionId, this);
        } else{<!-- -->
            // There is no direct addition
            webSocketMap. put(sessionId, this);
            // Increase the number of online users
            if(!memAccounts.contains(memAccount)){<!-- -->
                memAccounts. add(memAccount);
            }
            String message = genMessage(200, "The current online number is: " + getOnlineNumber());
            sendMessageAll(message);
        }
        log.info("Connection user: " + memAccount + ", the current connection number is: " + getOnlineNumber());
    }
    
    /**
     * generate message
     * @param code
     * @param message
     * @return
     */
    public String genMessage(int code, String message){<!-- -->
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", code);
        jsonObject. put("msg", message);
        return jsonObject.toJSONString();
    }
    
    // Get the current number of people online, not considering distribution for the time being
    public synchronized int getOnlineNumber() {<!-- -->
        return this.memAccounts.size();
    }
    
    // Extract the user account and session ID parameters from the query parameters
    public JSONObject assembleQueryJson(String queryStr){<!-- -->
        JSONObject jsonObject = new JSONObject();
        if(StringUtils.isEmpty(queryStr)){<!-- -->
            return jsonObject;
        }
        String[] queryParams = queryStr. split(" & amp;");
        for (int i=0;i< queryParams.length;i ++ ){<!-- -->
                String[] nameValues = queryParams[i]. split("=");
                if("memAccount".equals(nameValues[0])){<!-- -->
                    memAccount = nameValues[1];
                }else if("sessionId".equals(nameValues[0])){<!-- -->
                    sessionId = nameValues[1];
                }
        }
        jsonObject.put("memAccount", memAccount);
        jsonObject.put("sessionId", sessionId);
        return jsonObject;
    }
    
    @OnMessage
    public void onMessage(String message, Session session){<!-- -->
        log.info("Received client webSocket message: {}, queryString={}", message, session.getQueryString());
    }
    
    @OnClose
    public void onClose(Session session) throws InterruptedException, IOException {<!-- -->
        log.info("webSocket closed session");
        String queryStr = session. getQueryString();
        log.info("queryStr:{}", queryStr);
        if(webSocketMap. containsKey(sessionId)){<!-- -->
            webSocketMap. remove(sessionId);
            memAccounts. remove(memAccount);
            log.info("User:" + memAccount + "Exit the system, the current number of connections is: " + getOnlineNumber());
        }
        // notify everyone
        String message = genMessage(200, "The current number of connections is: " + getOnlineNumber());
        sendMessageAll(message);
    }
    
    /**
     * Send a single message to the client
     * @param message message content
     * @param sessionId session id
     * @throws IOException
     */
    public void sendMessage(String message, String sessionId) throws IOException {<!-- -->
        log.info("Send a message to " + sessionId + ", the content is: " + message);
        if(StringUtils.isNotEmpty(sessionId) & amp; & amp; webSocketMap.containsKey(sessionId)){<!-- -->
            webSocketMap.get(sessionId).sendMessage(message);
        }
    }
    
    /**
     * Single message
     * The server sends a message to the client
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {<!-- -->
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * Group message
     * @param message
     * @throws IOException
     */
    public void sendMessageAll(String message) throws IOException {<!-- -->
        for(String sessionId: webSocketMap.keySet()){<!-- -->
            webSocketMap.get(sessionId).sendMessage(message);
        }
    }
    
     @OnError
    public void onError(Session session, Throwable ex){<!-- -->
        log.error("webSocket Error", ex);
        JSONObject queryJson = assembleQueryJson(session. getQueryString());
        String sessionId = queryJson. getString("sessionId");
        log.error(sessionId + "connection error", ex);
    }
      
}

Note that WebSocket endpoints need to be exposed using the @ServerEndpoint annotation

2.4 When the same user logs in, add the logic of kicking out users logged in from other devices

In the WebSocketServer class, it is added to judge whether the same user has more than one session, and if so, kick out the previous session

// The maximum number of sessions allowed by the same account
    private static int MAX_SESSION_SIZE = 1;
    // Store the user's key prefix in redis
    private static String USER_KEY_PREFIX = "memInfo_";
    // user websocket session queue prefix
    private static String USER_DEQUE_PREFIX = "memInfo_deque_";
    // The lock prefix when judging whether there are multiple sessions for the same user and kicking out the previous session
    private static String LOCK_KEY_PREFIX = "memInfo_lock_";

/**
     * Determine whether the user has multiple devices to log in, and if so, kick off the previous logged-in user
     * @param sessionId session ID
     * @param memAccount member account
     */
    public void kickOut(String sessionId, String memAccount){<!-- -->
        log.info("sessionId:{}, memAccount:{}", sessionId, memAccount);
        String userKey = USER_KEY_PREFIX + memAccount + "_" + sessionId;
        RBucket<JSONObject> redisBucket = redissonClient.getBucket(userKey);
        JSONObject currentUser = redisBucket. get();
        log.info("currentUser={}", JSONObject.toJSONString(currentUser));
        String userDequeKey = USER_DEQUE_PREFIX + currentUser. getString("memAccount");
        // lock
        String lockKey = LOCK_KEY_PREFIX + memAccount;
        RLock lock = redissonClient.getLock(lockKey);
        lock. lock(2L, TimeUnit. SECONDS);
        try {<!-- -->
            RDeque<String> deque = redissonClient.getDeque(userDequeKey);
            // If there is no such sessionId in the queue, and the user is not kicked out, join the queue
            if(!deque.contains(sessionId) & amp; & amp; !currentUser.getBoolean("isKickOut")){<!-- -->
                deque.push(sessionId);
            }
            // If the number of sessionIds in the queue exceeds the maximum number of sessions, start kicking the user
            while (deque. size()>MAX_SESSION_SIZE){<!-- -->
                String kickOutSessionId;
                if(KICKOUT_AFTER){<!-- -->
                    kickOutSessionId = deque. removeFirst();
                } else {<!-- -->
                    kickOutSessionId = deque. removeLast();
                }
                RBucket<JSONObject> kickOutBucket = redissonClient.getBucket(USER_KEY_PREFIX + memAccount + "_" + kickOutSessionId);
                JSONObject kickOutUser = kickOutBucket. get();
                if(kickOutUser!=null){<!-- -->
                    kickOutUser. put("isKickOut", true);
                    log.info("kickOutUser={}", kickOutUser.toJSONString());
                    kickOutBucket.set(kickOutUser);
                    JSONObject wsJson = new JSONObject();
                    wsJson.put("code", 1001); // Response code 1001 means being kicked out of login
                    wsJson.put("msg", "This account has been logged in elsewhere or has been kicked out, if you have any questions, please contact your superior");
                    sendMessage(wsJson.toJSONString(), kickOutSessionId);
                    currentUser = redisBucket. get();
                }
            }
            if(currentUser.getBoolean("isKickOut")){<!-- -->
                JSONObject wsJson = new JSONObject();
                wsJson.put("code", 1001);
                wsJson.put("msg", "This account has been logged in elsewhere or has been kicked out, if you have any questions, please contact your superior");
                sendMessage(wsJson.toJSONString(), this.sessionId);
            }
        } catch (Exception e){<!-- -->
            log. error("kickOut error", e);
        } finally {<!-- -->
            // release the lock
            if(lock.isHeldByCurrentThread()){<!-- -->
                lock. unlock();
                log.info("User: " + memAccount + " unlock");
            }else{<!-- -->
                log.info("User: " + memAccount + " already release lock ");
            }
        }

    }

2.5 After the user logs in successfully, asynchronously judge whether the current login user has multiple sessions, and if so, kick the previous session

This asynchronous logic is completed in the login success handler of the configure(HttpSecurity http) method of the Security configuration class

How to implement user login logic in the spring-security framework There are already too many articles on the Internet, so I won’t go into details here. Readers can also refer to the author’s previous article Spring Security Getting Started (3): Authentication based on custom database queries

 @Resource
    private RedissonClient redissonClient;

    @Resource(name = "asyncThreadPool")
    private ThreadPoolExecutor poolExecutor;

    @Resource
    private WebSocketServer webSocketServer;
protected void configure(HttpSecurity http) throws Exception {<!-- -->
    // Do not intercept websocket communication, because websocket is performed after successful authentication
    http.authorizeRequests().antMatchers("/wsMessage").permitAll()
        .anyRequest().authenticated()
                .and().httpBasic()
                .and().formLogin()
                .loginProcessingUrl("/member/login")
                 .successHandler((httpServletRequest, httpServletResponse, authentication) -> {<!-- -->
                     httpServletResponse.setContentType("application/json;charset=utf-8");
                     httpServletResponse.setStatus(HttpStatus.OK.value());
                     PrintWriter printWriter = httpServletResponse. getWriter();
                     // Obtain the authentication information memInfoDTO (representing user information) from the authentication input parameter
                     MemInfoDTO memInfoDTO = (MemInfoDTO) authentication. getPrincipal();
                     Map<String, Object>
                         userJson.put("memAccount", memInfoDTO.getMemAccount());
                     userJson.put("sessionId", sessionId);
                     userJson. put("isKickOut", false);
                     RBucket<JSONObject> bucket = redissonClient.getBucket("memInfo_" + memInfoDTO.getMemAccount() + "_" + sessionId);
                     bucket.set(userJson, 2*60*60L, TimeUnit.SECONDS); userMap = new HashMap<>();
                     userMap.put("memAccount", memInfoDTO.getMemAccount());
                     // Generate a uuid as session id
                     String sessionId = UUID.randomUUID(true).toString().replaceAll("-",""); dataMap.put("sessionId", sessionId);
                     // Save user information to redis: key is memInfo_ + memAccount + _ + sessionId
                     JSONObject userJson = new JSONObject();
                     userJson.put("memAccount", memInfoDTO.getMemAccount());
                     userJson.put("sessionId", sessionId);
                     userJson.put("isKickOut", false); // Whether to be kicked out
                     // Save user account and session information in redis, with an expiration time of 2 hours
                     RBucket<JSONObject> bucket = redissonClient.getBucket("memInfo_" + memInfoDTO.getMemAccount() + "_" + sessionId);
                     bucket.set(userJson, 2*60*60L, TimeUnit.SECONDS);
                      // Asynchronously determine whether the user has logged in on other devices, and if so, kick the previous logged-in user
                     poolExecutor. execute(()->{<!-- -->
                        webSocketServer.kickOut(sessionId, memInfoDTO.getMemAccount());
                     });
                     // Return user information and session id to the client
                      ResponseResult<Map<String, Object>> responseResult = ResponseResult. success(dataMap, "login success");
                     printWriter.write(JSONObject.toJSONString(responseResult));
                     printWriter. flush();
                     printWriter. close();
                 });
        
    
}

poolExecutor is a custom thread pool bean

package com.bonus.bonusbackend.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class TheadPoolConfig {<!-- -->

    @Bean("asyncThreadPool")
    public ThreadPoolExecutor threadPoolExecutor(){<!-- -->
        BlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>(100);
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit. SECONDS, blockingDeque);
        return poolExecutor;
    }
}

2.6 Startup class

Add the @EnableAsync annotation to the startup class

@SpringBootApplication
@EnableAsync
public class BonusBackendApplication {<!-- -->
    public static void main(String[] args) {<!-- -->
        SpringApplication.run(BonusBackendApplication.class, args);
    }
}

2.7 Project environment variables

Each environment shares the environment configuration variable file:

application.properties

server.servlet.context-path=/bonus
spring.profiles.active=dev
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8

Development environment configuration variable file:

application-dev.properties

server.address=127.0.0.1
server.port=8090

spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.url=jdbc:mysql://localhost:3306/bonus?useUnicode=true &characterEncoding=UTF-8 &serverTimezone=Asia/Shanghai &allowMultiQueries=true
spring.datasource.druid.username=bonus_user
spring.datasource.druid.password=tiger2022@
spring.datasource.druid.validation-query=select 1 from dual

3 Summary

This article mainly implements the server function realization of the same user logging in and kicking each other on different devices through WebSocket. In the next article, we will continue to introduce the implementation of the client, and then download it to test the effect.
This article is the first personal WeChat public account [Afu talks about Web programming]. Readers and friends who want to see the functions and effects of the client as soon as possible can add attention through the search function of the WeChat public account. Let’s move forward hand in hand on the road of technological advancement , Encourage each other!

4 References

[1] Spring Boot Hands-on Teaching (18): Based on Redis and Redisson, users can kick each other. A user can only log in with one browser (https://juejin.cn/post/6867157108987527175)

[2] Spring Boot Hands-on Teaching (17): websocket analysis and how to access websocket at the front and back ends (https://juejin.cn/post/6865070438243008520)