Main configuration
import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; /** * @author * WebSocket configuration class */ @Slf4j @Configuration @AllArgsConstructor @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {<!-- --> private final WebSocketInterceptor webSocketInterceptor; private final WebSocketHandshakeInterceptor webSocketHandshakeInterceptor; public static final String USER_DESTINATION_PREFIX = "/salaryother/"; public static final String CALL_DEVICE_NOTIFY_PATH = USER_DESTINATION_PREFIX + "CALL_DEVICE_NOTIFY/"; @Override public void registerStompEndpoints(StompEndpointRegistry registry) {<!-- --> log.info("WebSocket Server Registration"); registry.addEndpoint("/ws") .addInterceptors(webSocketHandshakeInterceptor) .setHandshakeHandler(new WebSocketHandshakeHandler()) .setAllowedOrigins("*") .withSockJS(); registry.addEndpoint("/wsapp") .addInterceptors(webSocketHandshakeInterceptor) .setHandshakeHandler(new WebSocketHandshakeHandler()) .setAllowedOrigins("*"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) {<!-- --> log.info("WebSocket server started"); //Heartbeat detection ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setPoolSize(10); threadPoolTaskScheduler.setThreadNamePrefix("wss-heartbeat-thread-"); threadPoolTaskScheduler.initialize(); ///Information receiving header registry.enableSimpleBroker("/topic", USER_DESTINATION_PREFIX) .setHeartbeatValue(new long[]{<!-- -->10000, 10000}).setTaskScheduler(threadPoolTaskScheduler); //Receive prefix registry.setApplicationDestinationPrefixes("/topic"); //Request prefix registry.setUserDestinationPrefix("/user"); } /** * Configure the message parameters for sending and receiving. You can specify the message byte size, cache size, and sending timeout. * * @param registration */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) {<!-- --> /* * 1. setMessageSizeLimit sets the size of the message cache in bytes. * 2. setSendBufferSizeLimit sets the cache size in bytes when setting the websocket session * 3. setSendTimeLimit sets the message sending session timeout, milliseconds */ registration.setMessageSizeLimit(10240) .setSendBufferSizeLimit(10240) .setSendTimeLimit(10000); } @Override public void configureClientInboundChannel(ChannelRegistration registration) {<!-- --> /* * Configure message thread pool * 1. corePoolSize configures the core thread pool. When the number of threads is less than this configuration, new thread processing tasks will be generated regardless of whether there are idle threads in the thread. * 2. maxPoolSize configures the maximum number of thread pools. When the number of thread pools is equal to this configuration, no new threads will be generated. * 3. keepAliveSeconds The idle time allowed by the thread pool maintenance thread, in seconds */ registration.taskExecutor().corePoolSize(10) .maxPoolSize(60) .keepAliveSeconds(60); registration.interceptors(webSocketInterceptor); } // This is a bean rewritten to resolve conflicts with scheduled tasks @Primary @Bean public TaskScheduler taskScheduler() {<!-- --> ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); return taskScheduler; } }
Handshake interception (this scheme seems to be unable to add Header to the front end, so there is no permission verification here). The method used here is to directly splice the question mark with the token, and the front end is new SockJS (with a question mark here). Sockjs uses http, so there is nothing wrong with it. This article uses is OAuth2 permission verification
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.joolun.cloud.common.security.entity.BaseUser; import com.joolun.cloud.common.security.util.SecurityUtils; import io.micrometer.core.lang.NonNullApi; import io.micrometer.core.lang.Nullable; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.oauth2.provider.OAuth2Authentication; import org.springframework.security.oauth2.provider.token.RemoteTokenServices; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import javax.servlet.http.HttpServletRequest; import java.util.Map; @Slf4j @Component @NonNullApi @AllArgsConstructor public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {<!-- --> private WebSocketManager webSocketManager; private RemoteTokenServices tokenService; //SecurityUtils.getUser() code // SecurityContextHolder.getContext().getAuthentication().getPrincipal() // principal instanceof BaseUser ? (BaseUser)principal : null; @Override public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) {<!-- --> String token = getToken(serverHttpRequest); if (StrUtil.isBlank(token)) return false; //Verify token information try {<!-- --> OAuth2Authentication auth2Authentication = tokenService.loadAuthentication(token); if (ObjectUtil.isNull(auth2Authentication)) return false; SecurityContextHolder.getContext().setAuthentication(auth2Authentication); } catch (Exception e) {<!-- --> log.error("Token verification failed"); return false; } BaseUser user = SecurityUtils.getUser(); String userId = user.getId(); map.put("WebSocket-user", new WebSocketUserAuthentication(userId, user.getUsername(), token)); webSocketManager.addUser(userId, token); log.info("userId:" + userId + "Username:" + user.getUsername() + ":Start establishing connection"); return true; } @Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, @Nullable Exception e) {<!-- --> BaseUser user = SecurityUtils.getUser(); log.info("userId:" + user.getId() + "Username:" + user.getUsername() + ": Connection establishment completed"); } private String getToken(ServerHttpRequest serverHttpRequest) {<!-- --> ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) serverHttpRequest; HttpServletRequest httpServletRequest = servletRequest.getServletRequest(); String token = httpServletRequest.getParameter("Authorization"); return StrUtil.isBlank(token) ? "" : token; }
You can then set up identity injection after the handshake (this can be used directly during one-to-one subscriptions after configuring this)
import io.micrometer.core.lang.NonNullApi; import org.springframework.http.server.ServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import java.security.Principal; import java.util.Map; @NonNullApi public class WebSocketHandshakeHandler extends DefaultHandshakeHandler {<!-- --> @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {<!-- --> return (Principal) attributes.get("WebSocket-user"); } }
import lombok.Data; import java.security.Principal; @Data public class WebSocketUserAuthentication implements Principal {<!-- --> private String token; private String userId; private String userName; public WebSocketUserAuthentication(String userId, String userName,String token) {<!-- --> this.token = token; this.userId = userId; this.userName = userName; } public WebSocketUserAuthentication() {<!-- --> } @Override public String getName() {<!-- --> return token; } }
Store user data
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.TimeUnit; @Slf4j @Component public class WebSocketManager {<!-- --> Cache<String, String> webSocketUser; @PostConstruct public void init() {<!-- --> webSocketUser = Caffeine.newBuilder().initialCapacity(16).expireAfterWrite(60, TimeUnit.MINUTES).build(); } public boolean isOnline(String userId) {<!-- --> return StringUtils.isNotBlank(webSocketUser.getIfPresent(userId)); } public void addUser(String userId, String token) {<!-- --> webSocketUser.put(userId, token); } public String getTokenById(String userId) {<!-- --> return webSocketUser.getIfPresent(userId); } public void deleteUser(String userId) {<!-- --> webSocketUser.invalidate(userId); } }
Then there is the channel interception. If you do not use handshake interception, you can authenticate here. Here you can get the header sent after the handshake. The front end will add it to the headers.
this.stompClient.connect( headers, () => {<!-- --> .....
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.joolun.cloud.common.security.entity.BaseUser; import io.micrometer.core.lang.NonNullApi; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.oauth2.provider.OAuth2Authentication; import org.springframework.security.oauth2.provider.token.RemoteTokenServices; import org.springframework.stereotype.Component; import java.util.Optional; @Slf4j @NonNullApi @Component @Order(Ordered.HIGHEST_PRECEDENCE + 99) @AllArgsConstructor public class WebSocketInterceptor implements ChannelInterceptor {<!-- --> private WebSocketManager webSocketManager; private RemoteTokenServices tokenService; @Override public Message<?> preSend(Message<?> message, MessageChannel channel) {<!-- --> StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); if (ObjectUtil.isNull(accessor)) throw new MessagingException("Failed to obtain data"); StompCommand stompCommand = accessor.getCommand(); // Determine whether it is a connection or disconnection request if (StompCommand.CONNECT != stompCommand & amp; & amp; StompCommand.DISCONNECT != stompCommand) return message; //Get user information Optional<WebSocketUserAuthentication> user = getUser(accessor); if (!user.isPresent()) throw new MessagingException("Failed to obtain user"); WebSocketUserAuthentication baseUser = user.get(); String userId = baseUser.getUserId(); // Offline request if (StompCommand.DISCONNECT == stompCommand) {<!-- --> String userName = baseUser.getUserName(); webSocketManager.deleteUser(userId); log.info("userId:" + userId + "Username:" + userName + ":Offline"); } return message; } /** * Called immediately after the message is sent, the boolean value parameter indicates the return value of the call */ @Override public void postSend(Message<?> message, MessageChannel messageChannel, boolean sent) {<!-- --> StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); //Ignore non-STOMP messages such as heartbeat messages StompCommand command = accessor.getCommand(); if (command == null) return; switch (command) {<!-- --> case CONNECT: log.info("Online"); break; case CONNECTED: log.info("Connected"); break; case SUBSCRIBE: log.info("Subscription:" + accessor.getDestination()); break; case UNSUBSCRIBE: log.info("Unsubscribe:" + accessor.getDestination()); break; case DISCONNECT: log.info("Offline"); break; default: log.info("Does not match the above conditions"); break; } } private boolean isUserOAuth2Authentication(StompHeaderAccessor accessor) {<!-- --> String token = getToken(accessor); if (StrUtil.isBlank(token)) return false; try {<!-- --> //Verify token information OAuth2Authentication auth2Authentication = tokenService.loadAuthentication(token); if (ObjectUtil.isNull(auth2Authentication)) return false; SecurityContextHolder.getContext().setAuthentication(auth2Authentication); } catch (Exception e) {<!-- --> log.error("Token verification failed"); return false; } return true; } private Optional<WebSocketUserAuthentication> getUser(StompHeaderAccessor accessor) {<!-- --> return accessor.getUser() instanceof WebSocketUserAuthentication ? Optional.of((WebSocketUserAuthentication) accessor.getUser()) : getSystemUserToWebSocketUserAuthentication(accessor) ; } private Optional<WebSocketUserAuthentication> getSystemUserToWebSocketUserAuthentication(StompHeaderAccessor accessor) {<!-- --> Authentication authentication = getAuthentication(); if (ObjectUtil.isNull(authentication)) {<!-- --> if (isUserOAuth2Authentication(accessor)) {<!-- --> authentication = getAuthentication(); } else {<!-- --> return Optional.empty(); } } Object principal = authentication.getPrincipal(); if (ObjectUtil.isNull(principal)) return Optional.empty(); BaseUser user = principal instanceof BaseUser ? (BaseUser) principal : null; if (ObjectUtil.isNotNull(user)) {<!-- --> WebSocketUserAuthentication webSocketUserAuthentication = new WebSocketUserAuthentication(user.getId(), user.getUsername(), getToken(accessor)); accessor.setUser(webSocketUserAuthentication); return Optional.of(webSocketUserAuthentication); } return Optional.empty(); } private String getToken(StompHeaderAccessor accessor) {<!-- --> String tokens = accessor.getFirstNativeHeader("Authorization"); if (StrUtil.isBlank(tokens)) return ""; return tokens.split(" ")[1]; } private Authentication getAuthentication() {<!-- --> return SecurityContextHolder.getContext().getAuthentication(); } }
After the configuration is completed, the message is sent. I will not give a detailed example here, but take SimpMessageSendingOperations as an example.
//Introduction import org.springframework.messaging.simp.SimpMessageSendingOperations; private final SimpMessageSendingOperations simpMessageSendingOperations; private final WebSocketManager webSocketManager;
If the handshake interceptor is configured, Principal personal messages are returned
Subscription address:/user/salaryother/activistIncoming Sending address:/salaryother/activistIncoming Send a message: simpMessageSendingOperations.convertAndSendToUser (webSocketManager.getTokenById(user id), sending address, message);
If it is not configured, you need to add a few more prefixes. For specific reference, please click:
Spring Springboot implements websocket communication-2 this details
WeChat applet connects to websocket