Use sockjs and stomp in java to complete websocket communication

Main configuration

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;



/**
 * @author
 * WebSocket configuration class
 */
@Slf4j
@Configuration
@AllArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {<!-- -->

    private final WebSocketInterceptor webSocketInterceptor;
    private final WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
    public static final String USER_DESTINATION_PREFIX = "/salaryother/";
    public static final String CALL_DEVICE_NOTIFY_PATH = USER_DESTINATION_PREFIX + "CALL_DEVICE_NOTIFY/";

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {<!-- -->
        log.info("WebSocket Server Registration");
        registry.addEndpoint("/ws")
                .addInterceptors(webSocketHandshakeInterceptor)
                .setHandshakeHandler(new WebSocketHandshakeHandler())
                .setAllowedOrigins("*")
                .withSockJS();

        registry.addEndpoint("/wsapp")
                .addInterceptors(webSocketHandshakeInterceptor)
                .setHandshakeHandler(new WebSocketHandshakeHandler())
                .setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {<!-- -->
        log.info("WebSocket server started");
        //Heartbeat detection
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(10);
        threadPoolTaskScheduler.setThreadNamePrefix("wss-heartbeat-thread-");
        threadPoolTaskScheduler.initialize();
        ///Information receiving header
        registry.enableSimpleBroker("/topic", USER_DESTINATION_PREFIX)
                .setHeartbeatValue(new long[]{<!-- -->10000, 10000}).setTaskScheduler(threadPoolTaskScheduler);
        //Receive prefix
        registry.setApplicationDestinationPrefixes("/topic");
        //Request prefix
        registry.setUserDestinationPrefix("/user");
    }

    /**
     * Configure the message parameters for sending and receiving. You can specify the message byte size, cache size, and sending timeout.
     *
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {<!-- -->
        /*
         * 1. setMessageSizeLimit sets the size of the message cache in bytes.
         * 2. setSendBufferSizeLimit sets the cache size in bytes when setting the websocket session
         * 3. setSendTimeLimit sets the message sending session timeout, milliseconds
         */
        registration.setMessageSizeLimit(10240)
                .setSendBufferSizeLimit(10240)
                .setSendTimeLimit(10000);
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {<!-- -->

        /*
         * Configure message thread pool
         * 1. corePoolSize configures the core thread pool. When the number of threads is less than this configuration, new thread processing tasks will be generated regardless of whether there are idle threads in the thread.
         * 2. maxPoolSize configures the maximum number of thread pools. When the number of thread pools is equal to this configuration, no new threads will be generated.
         * 3. keepAliveSeconds The idle time allowed by the thread pool maintenance thread, in seconds
         */
        registration.taskExecutor().corePoolSize(10)
                .maxPoolSize(60)
                .keepAliveSeconds(60);
        registration.interceptors(webSocketInterceptor);
    }

    // This is a bean rewritten to resolve conflicts with scheduled tasks
    @Primary
    @Bean
    public TaskScheduler taskScheduler() {<!-- -->
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }


}

Handshake interception (this scheme seems to be unable to add Header to the front end, so there is no permission verification here). The method used here is to directly splice the question mark with the token, and the front end is new SockJS (with a question mark here). Sockjs uses http, so there is nothing wrong with it. This article uses is OAuth2 permission verification

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.joolun.cloud.common.security.entity.BaseUser;
import com.joolun.cloud.common.security.util.SecurityUtils;
import io.micrometer.core.lang.NonNullApi;
import io.micrometer.core.lang.Nullable;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.token.RemoteTokenServices;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpServletRequest;
import java.util.Map;

@Slf4j
@Component
@NonNullApi
@AllArgsConstructor
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {<!-- -->
    private WebSocketManager webSocketManager;
    private RemoteTokenServices tokenService;
//SecurityUtils.getUser() code
// SecurityContextHolder.getContext().getAuthentication().getPrincipal()
// principal instanceof BaseUser ? (BaseUser)principal : null;

    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) {<!-- -->
        String token = getToken(serverHttpRequest);
        if (StrUtil.isBlank(token)) return false;
        //Verify token information
        try {<!-- -->
            OAuth2Authentication auth2Authentication = tokenService.loadAuthentication(token);
            if (ObjectUtil.isNull(auth2Authentication)) return false;
            SecurityContextHolder.getContext().setAuthentication(auth2Authentication);
        } catch (Exception e) {<!-- -->
            log.error("Token verification failed");
            return false;
        }
        BaseUser user = SecurityUtils.getUser();
        String userId = user.getId();
        map.put("WebSocket-user", new WebSocketUserAuthentication(userId, user.getUsername(), token));
        webSocketManager.addUser(userId, token);
        log.info("userId:" + userId + "Username:" + user.getUsername() + ":Start establishing connection");
        return true;

    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, @Nullable Exception e) {<!-- -->
        BaseUser user = SecurityUtils.getUser();
        log.info("userId:" + user.getId() + "Username:" + user.getUsername() + ": Connection establishment completed");
    }


    private String getToken(ServerHttpRequest serverHttpRequest) {<!-- -->
        ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) serverHttpRequest;
        HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
        String token = httpServletRequest.getParameter("Authorization");
        return StrUtil.isBlank(token) ? "" : token;
    }

You can then set up identity injection after the handshake (this can be used directly during one-to-one subscriptions after configuring this)

import io.micrometer.core.lang.NonNullApi;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import java.security.Principal;
import java.util.Map;

@NonNullApi
public class WebSocketHandshakeHandler extends DefaultHandshakeHandler {<!-- -->
    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {<!-- -->
        return (Principal) attributes.get("WebSocket-user");
    }


}
import lombok.Data;

import java.security.Principal;

@Data
public class WebSocketUserAuthentication implements Principal {<!-- -->

    private String token;
    private String userId;
    private String userName;

    public WebSocketUserAuthentication(String userId, String userName,String token) {<!-- -->
        this.token = token;
        this.userId = userId;
        this.userName = userName;
    }

    public WebSocketUserAuthentication() {<!-- -->
    }

    @Override
    public String getName() {<!-- -->
        return token;
    }

}

Store user data

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class WebSocketManager {<!-- -->

    Cache<String, String> webSocketUser;

    @PostConstruct
    public void init() {<!-- -->
        webSocketUser = Caffeine.newBuilder().initialCapacity(16).expireAfterWrite(60, TimeUnit.MINUTES).build();
    }

    public boolean isOnline(String userId) {<!-- -->
        return StringUtils.isNotBlank(webSocketUser.getIfPresent(userId));
    }

    public void addUser(String userId, String token) {<!-- -->
        webSocketUser.put(userId, token);
    }

    public String getTokenById(String userId) {<!-- -->
        return webSocketUser.getIfPresent(userId);
    }

    public void deleteUser(String userId) {<!-- -->
        webSocketUser.invalidate(userId);
    }
}

Then there is the channel interception. If you do not use handshake interception, you can authenticate here. Here you can get the header sent after the handshake. The front end will add it to the headers.

this.stompClient.connect(
        headers,
        () => {<!-- -->
        .....
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.joolun.cloud.common.security.entity.BaseUser;
import io.micrometer.core.lang.NonNullApi;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.token.RemoteTokenServices;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Slf4j
@NonNullApi
@Component
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
@AllArgsConstructor
public class WebSocketInterceptor implements ChannelInterceptor {<!-- -->

    private WebSocketManager webSocketManager;
    private RemoteTokenServices tokenService;


    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {<!-- -->
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        if (ObjectUtil.isNull(accessor)) throw new MessagingException("Failed to obtain data");
        StompCommand stompCommand = accessor.getCommand();
        // Determine whether it is a connection or disconnection request
        if (StompCommand.CONNECT != stompCommand & amp; & amp; StompCommand.DISCONNECT != stompCommand) return message;
        //Get user information
        Optional<WebSocketUserAuthentication> user = getUser(accessor);
        if (!user.isPresent()) throw new MessagingException("Failed to obtain user");
        WebSocketUserAuthentication baseUser = user.get();
        String userId = baseUser.getUserId();
        // Offline request
        if (StompCommand.DISCONNECT == stompCommand) {<!-- -->
            String userName = baseUser.getUserName();
            webSocketManager.deleteUser(userId);
            log.info("userId:" + userId + "Username:" + userName + ":Offline");
        }
        return message;
    }


    /**
     * Called immediately after the message is sent, the boolean value parameter indicates the return value of the call
     */
    @Override
    public void postSend(Message<?> message, MessageChannel messageChannel, boolean sent) {<!-- -->
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        //Ignore non-STOMP messages such as heartbeat messages
        StompCommand command = accessor.getCommand();
        if (command == null) return;
        switch (command) {<!-- -->
            case CONNECT:
                log.info("Online");
                break;
            case CONNECTED:
                log.info("Connected");
                break;
            case SUBSCRIBE:
                log.info("Subscription:" + accessor.getDestination());
                break;
            case UNSUBSCRIBE:
                log.info("Unsubscribe:" + accessor.getDestination());
                break;
            case DISCONNECT:
                log.info("Offline");
                break;
            default:
                log.info("Does not match the above conditions");
                break;
        }
    }

    private boolean isUserOAuth2Authentication(StompHeaderAccessor accessor) {<!-- -->
        String token = getToken(accessor);
        if (StrUtil.isBlank(token)) return false;
        try {<!-- -->
            //Verify token information
            OAuth2Authentication auth2Authentication = tokenService.loadAuthentication(token);
            if (ObjectUtil.isNull(auth2Authentication)) return false;
            SecurityContextHolder.getContext().setAuthentication(auth2Authentication);
        } catch (Exception e) {<!-- -->
            log.error("Token verification failed");
            return false;
        }
        return true;
    }

    private Optional<WebSocketUserAuthentication> getUser(StompHeaderAccessor accessor) {<!-- -->
        return accessor.getUser() instanceof WebSocketUserAuthentication ?
                Optional.of((WebSocketUserAuthentication) accessor.getUser()) :
                getSystemUserToWebSocketUserAuthentication(accessor)
                ;


    }

    private Optional<WebSocketUserAuthentication> getSystemUserToWebSocketUserAuthentication(StompHeaderAccessor accessor) {<!-- -->
        Authentication authentication = getAuthentication();
        if (ObjectUtil.isNull(authentication)) {<!-- -->
            if (isUserOAuth2Authentication(accessor)) {<!-- -->
                authentication = getAuthentication();
            } else {<!-- -->
                return Optional.empty();
            }
        }
        Object principal = authentication.getPrincipal();
        if (ObjectUtil.isNull(principal)) return Optional.empty();
        BaseUser user = principal instanceof BaseUser ? (BaseUser) principal : null;
        if (ObjectUtil.isNotNull(user)) {<!-- -->
            WebSocketUserAuthentication webSocketUserAuthentication = new WebSocketUserAuthentication(user.getId(), user.getUsername(), getToken(accessor));
            accessor.setUser(webSocketUserAuthentication);
            return Optional.of(webSocketUserAuthentication);
        }
        return Optional.empty();

    }

    private String getToken(StompHeaderAccessor accessor) {<!-- -->
        String tokens = accessor.getFirstNativeHeader("Authorization");
        if (StrUtil.isBlank(tokens)) return "";
        return tokens.split(" ")[1];
    }

    private Authentication getAuthentication() {<!-- -->
        return SecurityContextHolder.getContext().getAuthentication();
    }

}

After the configuration is completed, the message is sent. I will not give a detailed example here, but take SimpMessageSendingOperations as an example.

//Introduction
import org.springframework.messaging.simp.SimpMessageSendingOperations;

private final SimpMessageSendingOperations simpMessageSendingOperations;
private final WebSocketManager webSocketManager;

If the handshake interceptor is configured, Principal personal messages are returned

Subscription address:/user/salaryother/activistIncoming
Sending address:/salaryother/activistIncoming
Send a message:
 simpMessageSendingOperations.convertAndSendToUser
                    (webSocketManager.getTokenById(user id), sending address, message);

If it is not configured, you need to add a few more prefixes. For specific reference, please click:
Spring Springboot implements websocket communication-2 this details

WeChat applet connects to websocket