Implement WebSocket and user identity verification with Netty based on Springboot

Speaking at the beginning, the text mainly refers to:
SpringBoot + WebSocket + Netty to achieve message push
Life cycle of Netty-11-channelHandler
springboot integration netty guide north

First you need to understand the life cycle of channel establishment
The order of ChannelHandler is as follows:

handlerAdded() –> channelRegistered() –> channelActive() –> channelRead() –> channelReadComplete()

Note that the focus of this implementation is: it is the key to verify user identity by obtaining user information (such as jwt token, etc.) from the request header header or the first message dialogue when establishing websocket

1. NettyConfig

Define a singleton that manages all channel groups

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author sixiaojie
 * @date 2020-03-28-15:07
 */
public class NettyConfig {<!-- -->
    /**
     * Define a channel group to manage all channels
     * GlobalEventExecutor.INSTANCE is a global event executor and a singleton
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor. INSTANCE);

    /**
     * Store the corresponding information between users and Chanel, which is used to send messages to specified users
     */
    private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

    private NettyConfig() {<!-- -->}

    /**
     * Get channel group
     * @return
     */
    public static ChannelGroup getChannelGroup() {<!-- -->
        return channelGroup;
    }

    /**
     * Get user channel map
     * @return
     */
    public static ConcurrentHashMap<String,Channel> getUserChannelMap(){<!-- -->
        return userChannelMap;
    }
}

2. ChannelInboundHandlerAdapter

Inherit Channel processor and implement channelRead as connection authentication

As we all know WebSocket cannot customize the header, so how to add token to the header?
Check websocket in js and find

new WebSocket(url [protocols]);

The sub-protocol of websocket can be an array. Whether this value is in HTTP or in WebSocket, it is a “contract” between the front and back ends, and both ends must abide by it. Then put the user authentication information in the The first subscript position in the array, the backend can get it normally
Curve to save the country (not

//The Feishu applet used here
//websocket
    let socket = tt.connectSocket({<!-- -->
        url: "ws://localhost:58080/webSocket",
        header:{<!-- -->},
        protocols:[token],
        success() {<!-- -->
            console.log('Build WebSocketTask success');
        },
        fail(err) {<!-- -->
            console.error('Build WebSocketTask failed', err);
        }
    });
import com.feishu.estate.netty.nettypush.config.NettyConfig;
import io.jsonwebtoken.Claims;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AttributeKey;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * FullHttpRequest
 */

@ChannelHandler. Sharable
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AuthHandler extends ChannelInboundHandlerAdapter {<!-- -->


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {<!-- -->
        if (msg instanceof FullHttpRequest) {<!-- -->
            // Get the authentication token in the request header
            FullHttpRequest request = (FullHttpRequest) msg;
            HttpHeaders headers = request. headers();
            if (headers. size() < 1) {<!-- -->
                ctx.channel().close();
                return;
            }
            String uid = headers. get("Sec-WebSocket-Protocol");
            log.debug("Authentication success.uid: {}", uid);
            // Add the user ID to the channel as a custom attribute, so that the user ID can be obtained from the channel at any time
            AttributeKey<String> key = AttributeKey. valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // associated channel
            NettyConfig.getUserChannelMap().put(uid, ctx.channel());

            ctx.pipeline().remove(this);
            // Propagate the event until the WebSocket connection is completed.
            ctx.fireChannelRead(msg);
        } else {<!-- -->
            ctx.channel().close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {<!-- -->
        cause. printStackTrace();

        ctx.channel().close();
    }
}

Be lazy here, after getting the sub-protocol from the header normally, convert it into an array to get the first element, you can do some analysis by yourself, such as the jwt token as follows

 /**
     * Get the data claim from the token
     *
     * @param token token
     * @return data declaration
     */
    private Claims parseToken(String token)
    {<!-- -->
        return Jwts. parser()
                .setSigningKey(secret)
                .parseClaimsJws(token)
                .getBody();
    }

3. SimpleChannelInboundHandler

Handler used to process text frames

The handlerAdded is rewritten here. Referring to the instructions at the beginning of this article, it will be executed first, and then the verification of step 2 will be performed. During the test, it is found that some h5 cannot pass sub-protocols when establishing a websocket connection, so you can put channelRead0 handles in the first message send.

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.feishu.estate.netty.nettypush.config.NettyConfig;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 * TextWebSocketFrame type, representing a text frame
 * @author sixiaojie
 * @date 2020-03-28-13:47
 */
@Component
@ChannelHandler. Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {<!-- -->
    private static final Logger log = LoggerFactory. getLogger(NettyServer. class);
    // Token secret key
    @Value("${token.secret}")
    private String secret;

    /**
     * Once connected, the first one is executed
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {<!-- -->
        log.info("handlerAdded was called" + ctx.channel().id().asLongText());
        // Add to channelGroup channel group
        NettyConfig.getChannelGroup().add(ctx.channel());
    }


    /**
     * read data
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {<!-- -->
        log.info("The server received the message: {}",msg.text());

// // get user ID
// JSONObject jsonObject = JSONUtil. parseObj(msg. text());
// String uid = jsonObject.getStr("uid");
//
// // Add the user ID to the channel as a custom attribute, so that the user ID can be obtained from the channel at any time
// AttributeKey<String> key = AttributeKey. valueOf("userId");
// ctx.channel().attr(key).setIfAbsent(uid);
//
// // associated channel
// NettyConfig.getUserChannelMap().put(uid,ctx.channel());

        // Reply message
        ctx.channel().writeAndFlush(new TextWebSocketFrame("The server connection is successful!"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {<!-- -->
        log.info("handlerRemoved was called" + ctx.channel().id().asLongText());
        // delete channel
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {<!-- -->
        log.info("Exception: {}", cause.getMessage());
        // delete channel
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx. close();
    }

    /**
     * Delete the corresponding relationship between users and channels
     * @param ctx
     */
    private void removeUserId(ChannelHandlerContext ctx){<!-- -->
        AttributeKey<String> key = AttributeKey. valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getUserChannelMap().remove(userId);
    }
}

Front-end code for local testing

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var socket;
    // Determine whether the current browser supports webSocket
    if(window.WebSocket){<!-- -->
        socket = new WebSocket('ws://localhost:58080/webSocket','custome-protocol')
        // Equivalent to the read event of the channel, ev receives the message sent back by the server
        socket.onmessage = function (ev) {<!-- -->
            var rt = document. getElementById("responseText");
            rt.value = rt.value + "\\
" + ev.data;
        }
        // Equivalent to connection open
        socket.onopen = function (ev) {<!-- -->
            // set request header
            var rt = document. getElementById("responseText");
            rt.value = "Connection opened..."
            socket. send(
                JSON.stringify({<!-- -->
                    // If the connection is successful, the user ID will be passed to the server
                    uid: "123456"
                })
            );
        }
        // Equivalent to connection close
        socket.onclose = function (ev) {<!-- -->
            var rt = document. getElementById("responseText");
            rt.value = rt.value + "\\
" + "The connection is closed...";
        }
        socket.addEventListener('beforeSend', function(event) {<!-- -->
            event.target.setRequestHeader('Authorization', 'Bearer' + '1111');
            event.target.setRequestHeader('Custom-Header', 'value');
        });
    }else{<!-- -->
        alert("The current browser does not support webSocket")
    }


</script>
    <form onsubmit="return false">
        <textarea id="responseText" style="height: 150px; width: 300px;"></textarea>
        <input type="button" value="Clear content" onclick="document.getElementById('responseText').value=''">
    </form>
</body>
</html>

4. Add a heartbeat processing

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * If there are no read events for 2 consecutive times, close the client channel
 * @author sixiaojie
 * @date 2020-08-21-16:14
 */

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {<!-- -->

    private int lossConnectCount = 0;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {<!-- -->
        if (evt instanceof IdleStateEvent){<!-- -->
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event. state()== IdleState. READER_IDLE){<!-- -->
                lossConnectCount++;
                if (lossConnectCount > 2){<!-- -->
                    ctx.channel().close();
                }
            }
        } else {<!-- -->
            super.userEventTriggered(ctx,evt);
        }
    }
}

5. NettyServer implements WebSocket

Throw in the above custom implemented processors

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;

/**
 * @author sixiaojie
 * @date 2020-03-28-13:44
 */

@Component
public class NettyServer{<!-- -->
    private static final Logger log = LoggerFactory. getLogger(NettyServer. class);
    /**
     * webSocket protocol name
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * The port number
     */
    @Value("${webSocket.netty.port:58080}")
    private int port;

    /**
     * webSocket path
     */
    @Value("${webSocket.netty.path:/webSocket}")
    private String webSocketPath;

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Autowired
    private AuthHandler authHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * start up
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {<!-- -->
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup assists the client's tcp connection request, and workGroup is responsible for the previous read and write operations with the client
        bootstrap.group(bossGroup, workGroup);
        // Set the channel of NIO type
        bootstrap. channel(NioServerSocketChannel. class);
        // Set the listening port
        bootstrap.localAddress(new InetSocketAddress(port));
        // A channel is created when a connection arrives
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {<!-- -->

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
                // The handler (Handler) in the pipeline management channel is used to process business
                // The webSocket protocol itself is based on the http protocol, so the http codec should also be used here
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // Handler written in blocks
                ch.pipeline().addLast(new ChunkedWriteHandler());
                /*
                illustrate:
                1. HTTP data is segmented during transmission, and HttpObjectAggregator can aggregate multiple segments
                2. This is why, when the browser sends a large amount of data, it will send multiple http requests
                 */
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                // custom handler to handle authentication
                ch.pipeline().addLast(authHandler);
                //For the client, if there is no read event within 10s, the heartbeat processing method HeartBeatHandler#userEventTriggered will be triggered
                ch.pipeline().addLast(new IdleStateHandler(10 , 0 , 0));
                //Custom idle state detection (custom heartbeat detection handler)
                ch.pipeline().addLast(new HeartBeatHandler());
                /*
                illustrate:
                1. Corresponding to webSocket, its data is transmitted in the form of frame
                2. When the browser requests ws://localhost:58080/xxx indicates the requested uri
                3. The core function is to upgrade the http protocol to the ws protocol to maintain long connections
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                // Custom handler to handle business logic
                ch.pipeline().addLast(webSocketHandler);

            }
        });
        // The configuration is complete, start binding the server, and block until the binding is successful by calling the sync synchronization method
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
        // Listen to the closed channel
        channelFuture. channel(). closeFuture(). sync();
    }

    /**
     * Release resources
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {<!-- -->
        if(bossGroup != null){<!-- -->
            bossGroup.shutdownGracefully().sync();
        }
        if(workGroup != null){<!-- -->
            workGroup. shutdown Gracefully(). sync();
        }
    }
    @PostConstruct()
    public void init() {<!-- -->
        //Need to open a new thread to execute the netty server server
        new Thread(() -> {<!-- -->
            try {<!-- -->
                start();
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            }
        }).start();

    }


}

6. Make up the sending message Service

How to deal with message sending when this article does not implement distributed, students who need it can refer to the first link of the article

/**
 * @author sixiaojie
 * @date 2020-03-30-17:06
 */
public interface PushService {<!-- -->
    /**
     * Push to designated users
     * @param userId
     * @param msg
     */
    void pushMsgToOne(String userId,String msg);

    /**
     * push to all users
     * @param msg
     */
    void pushMsgToAll(String msg);
}

import com.feishu.estate.netty.nettypush.config.NettyConfig;
import com.feishu.estate.netty.nettypush.constant.BaseConstant;
import com.feishu.estate.netty.nettypush.pubsub.NettyPushMessageBody;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author sixiaojie
 * @date 2020-03-30-20:10
 */
@Service
public class PushServiceImpl implements PushService {<!-- -->
// @Autowired
// private RedisTemplate redisTemplate;

    @Override
    public void pushMsgToOne(String userId, String msg){<!-- -->
        ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap();
        Channel channel = userChannelMap. get(userId);
        if(!Objects.isNull(channel)){<!-- -->
            // If the user's client is a channel established with this server, push the message directly
            channel.writeAndFlush(new TextWebSocketFrame(msg));
        } else {<!-- -->
            // Publish and consume to other servers
// NettyPushMessageBody pushMessageBody = new NettyPushMessageBody();
// pushMessageBody.setUserId(userId);
// pushMessageBody.setMessage(msg);
// redisTemplate. convertAndSend(BaseConstant. PUSH_MESSAGE_TO_ONE, pushMessageBody);
        }

    }
    @Override
    public void pushMsgToAll(String msg){<!-- -->
        // Publish and consume to other servers
// redisTemplate.convertAndSend(BaseConstant.PUSH_MESSAGE_TO_ALL,msg);
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

ok, this is about the completion of building websocket with netty