Speaking at the beginning, the text mainly refers to:
SpringBoot + WebSocket + Netty to achieve message push
Life cycle of Netty-11-channelHandler
springboot integration netty guide north
First you need to understand the life cycle of channel establishment
The order of ChannelHandler is as follows:
handlerAdded() –> channelRegistered() –> channelActive() –> channelRead() –> channelReadComplete()
Note that the focus of this implementation is: it is the key to verify user identity by obtaining user information (such as jwt token, etc.) from the request header header or the first message dialogue when establishing websocket
1. NettyConfig
Define a singleton that manages all channel groups
import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; /** * @author sixiaojie * @date 2020-03-28-15:07 */ public class NettyConfig {<!-- --> /** * Define a channel group to manage all channels * GlobalEventExecutor.INSTANCE is a global event executor and a singleton */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor. INSTANCE); /** * Store the corresponding information between users and Chanel, which is used to send messages to specified users */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private NettyConfig() {<!-- -->} /** * Get channel group * @return */ public static ChannelGroup getChannelGroup() {<!-- --> return channelGroup; } /** * Get user channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){<!-- --> return userChannelMap; } }
2. ChannelInboundHandlerAdapter
Inherit Channel processor and implement channelRead as connection authentication
As we all know WebSocket cannot customize the header, so how to add token to the header?
Check websocket in js and find
new WebSocket(url [protocols]);
The sub-protocol of websocket can be an array. Whether this value is in HTTP or in WebSocket, it is a “contract” between the front and back ends, and both ends must abide by it. Then put the user authentication information in the The first subscript position in the array, the backend can get it normally
Curve to save the country (not
//The Feishu applet used here //websocket let socket = tt.connectSocket({<!-- --> url: "ws://localhost:58080/webSocket", header:{<!-- -->}, protocols:[token], success() {<!-- --> console.log('Build WebSocketTask success'); }, fail(err) {<!-- --> console.error('Build WebSocketTask failed', err); } });
import com.feishu.estate.netty.nettypush.config.NettyConfig; import io.jsonwebtoken.Claims; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.util.AttributeKey; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * FullHttpRequest */ @ChannelHandler. Sharable @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AuthHandler extends ChannelInboundHandlerAdapter {<!-- --> @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {<!-- --> if (msg instanceof FullHttpRequest) {<!-- --> // Get the authentication token in the request header FullHttpRequest request = (FullHttpRequest) msg; HttpHeaders headers = request. headers(); if (headers. size() < 1) {<!-- --> ctx.channel().close(); return; } String uid = headers. get("Sec-WebSocket-Protocol"); log.debug("Authentication success.uid: {}", uid); // Add the user ID to the channel as a custom attribute, so that the user ID can be obtained from the channel at any time AttributeKey<String> key = AttributeKey. valueOf("userId"); ctx.channel().attr(key).setIfAbsent(uid); // associated channel NettyConfig.getUserChannelMap().put(uid, ctx.channel()); ctx.pipeline().remove(this); // Propagate the event until the WebSocket connection is completed. ctx.fireChannelRead(msg); } else {<!-- --> ctx.channel().close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {<!-- --> cause. printStackTrace(); ctx.channel().close(); } }
Be lazy here, after getting the sub-protocol from the header normally, convert it into an array to get the first element, you can do some analysis by yourself, such as the jwt token as follows
/** * Get the data claim from the token * * @param token token * @return data declaration */ private Claims parseToken(String token) {<!-- --> return Jwts. parser() .setSigningKey(secret) .parseClaimsJws(token) .getBody(); }
3. SimpleChannelInboundHandler
Handler used to process text frames
The handlerAdded is rewritten here. Referring to the instructions at the beginning of this article, it will be executed first, and then the verification of step 2 will be performed. During the test, it is found that some h5 cannot pass sub-protocols when establishing a websocket connection, so you can put channelRead0 handles in the first message send.
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.feishu.estate.netty.nettypush.config.NettyConfig; import io.jsonwebtoken.Claims; import io.jsonwebtoken.Jwts; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * TextWebSocketFrame type, representing a text frame * @author sixiaojie * @date 2020-03-28-13:47 */ @Component @ChannelHandler. Sharable public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {<!-- --> private static final Logger log = LoggerFactory. getLogger(NettyServer. class); // Token secret key @Value("${token.secret}") private String secret; /** * Once connected, the first one is executed * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {<!-- --> log.info("handlerAdded was called" + ctx.channel().id().asLongText()); // Add to channelGroup channel group NettyConfig.getChannelGroup().add(ctx.channel()); } /** * read data */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {<!-- --> log.info("The server received the message: {}",msg.text()); // // get user ID // JSONObject jsonObject = JSONUtil. parseObj(msg. text()); // String uid = jsonObject.getStr("uid"); // // // Add the user ID to the channel as a custom attribute, so that the user ID can be obtained from the channel at any time // AttributeKey<String> key = AttributeKey. valueOf("userId"); // ctx.channel().attr(key).setIfAbsent(uid); // // // associated channel // NettyConfig.getUserChannelMap().put(uid,ctx.channel()); // Reply message ctx.channel().writeAndFlush(new TextWebSocketFrame("The server connection is successful!")); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {<!-- --> log.info("handlerRemoved was called" + ctx.channel().id().asLongText()); // delete channel NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {<!-- --> log.info("Exception: {}", cause.getMessage()); // delete channel NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx. close(); } /** * Delete the corresponding relationship between users and channels * @param ctx */ private void removeUserId(ChannelHandlerContext ctx){<!-- --> AttributeKey<String> key = AttributeKey. valueOf("userId"); String userId = ctx.channel().attr(key).get(); NettyConfig.getUserChannelMap().remove(userId); } }
Front-end code for local testing
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> var socket; // Determine whether the current browser supports webSocket if(window.WebSocket){<!-- --> socket = new WebSocket('ws://localhost:58080/webSocket','custome-protocol') // Equivalent to the read event of the channel, ev receives the message sent back by the server socket.onmessage = function (ev) {<!-- --> var rt = document. getElementById("responseText"); rt.value = rt.value + "\\ " + ev.data; } // Equivalent to connection open socket.onopen = function (ev) {<!-- --> // set request header var rt = document. getElementById("responseText"); rt.value = "Connection opened..." socket. send( JSON.stringify({<!-- --> // If the connection is successful, the user ID will be passed to the server uid: "123456" }) ); } // Equivalent to connection close socket.onclose = function (ev) {<!-- --> var rt = document. getElementById("responseText"); rt.value = rt.value + "\\ " + "The connection is closed..."; } socket.addEventListener('beforeSend', function(event) {<!-- --> event.target.setRequestHeader('Authorization', 'Bearer' + '1111'); event.target.setRequestHeader('Custom-Header', 'value'); }); }else{<!-- --> alert("The current browser does not support webSocket") } </script> <form onsubmit="return false"> <textarea id="responseText" style="height: 150px; width: 300px;"></textarea> <input type="button" value="Clear content" onclick="document.getElementById('responseText').value=''"> </form> </body> </html>
4. Add a heartbeat processing
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * If there are no read events for 2 consecutive times, close the client channel * @author sixiaojie * @date 2020-08-21-16:14 */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter {<!-- --> private int lossConnectCount = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {<!-- --> if (evt instanceof IdleStateEvent){<!-- --> IdleStateEvent event = (IdleStateEvent)evt; if (event. state()== IdleState. READER_IDLE){<!-- --> lossConnectCount++; if (lossConnectCount > 2){<!-- --> ctx.channel().close(); } } } else {<!-- --> super.userEventTriggered(ctx,evt); } } }
5. NettyServer implements WebSocket
Throw in the above custom implemented processors
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.net.InetSocketAddress; /** * @author sixiaojie * @date 2020-03-28-13:44 */ @Component public class NettyServer{<!-- --> private static final Logger log = LoggerFactory. getLogger(NettyServer. class); /** * webSocket protocol name */ private static final String WEBSOCKET_PROTOCOL = "WebSocket"; /** * The port number */ @Value("${webSocket.netty.port:58080}") private int port; /** * webSocket path */ @Value("${webSocket.netty.path:/webSocket}") private String webSocketPath; @Autowired private WebSocketHandler webSocketHandler; @Autowired private AuthHandler authHandler; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; /** * start up * @throws InterruptedException */ private void start() throws InterruptedException {<!-- --> bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); // bossGroup assists the client's tcp connection request, and workGroup is responsible for the previous read and write operations with the client bootstrap.group(bossGroup, workGroup); // Set the channel of NIO type bootstrap. channel(NioServerSocketChannel. class); // Set the listening port bootstrap.localAddress(new InetSocketAddress(port)); // A channel is created when a connection arrives bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) throws Exception {<!-- --> // The handler (Handler) in the pipeline management channel is used to process business // The webSocket protocol itself is based on the http protocol, so the http codec should also be used here ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ObjectEncoder()); // Handler written in blocks ch.pipeline().addLast(new ChunkedWriteHandler()); /* illustrate: 1. HTTP data is segmented during transmission, and HttpObjectAggregator can aggregate multiple segments 2. This is why, when the browser sends a large amount of data, it will send multiple http requests */ ch.pipeline().addLast(new HttpObjectAggregator(8192)); // custom handler to handle authentication ch.pipeline().addLast(authHandler); //For the client, if there is no read event within 10s, the heartbeat processing method HeartBeatHandler#userEventTriggered will be triggered ch.pipeline().addLast(new IdleStateHandler(10 , 0 , 0)); //Custom idle state detection (custom heartbeat detection handler) ch.pipeline().addLast(new HeartBeatHandler()); /* illustrate: 1. Corresponding to webSocket, its data is transmitted in the form of frame 2. When the browser requests ws://localhost:58080/xxx indicates the requested uri 3. The core function is to upgrade the http protocol to the ws protocol to maintain long connections */ ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10)); // Custom handler to handle business logic ch.pipeline().addLast(webSocketHandler); } }); // The configuration is complete, start binding the server, and block until the binding is successful by calling the sync synchronization method ChannelFuture channelFuture = bootstrap.bind().sync(); log.info("Server started and listen on:{}",channelFuture.channel().localAddress()); // Listen to the closed channel channelFuture. channel(). closeFuture(). sync(); } /** * Release resources * @throws InterruptedException */ @PreDestroy public void destroy() throws InterruptedException {<!-- --> if(bossGroup != null){<!-- --> bossGroup.shutdownGracefully().sync(); } if(workGroup != null){<!-- --> workGroup. shutdown Gracefully(). sync(); } } @PostConstruct() public void init() {<!-- --> //Need to open a new thread to execute the netty server server new Thread(() -> {<!-- --> try {<!-- --> start(); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } }).start(); } }
6. Make up the sending message Service
How to deal with message sending when this article does not implement distributed, students who need it can refer to the first link of the article
/** * @author sixiaojie * @date 2020-03-30-17:06 */ public interface PushService {<!-- --> /** * Push to designated users * @param userId * @param msg */ void pushMsgToOne(String userId,String msg); /** * push to all users * @param msg */ void pushMsgToAll(String msg); }
import com.feishu.estate.netty.nettypush.config.NettyConfig; import com.feishu.estate.netty.nettypush.constant.BaseConstant; import com.feishu.estate.netty.nettypush.pubsub.NettyPushMessageBody; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** * @author sixiaojie * @date 2020-03-30-20:10 */ @Service public class PushServiceImpl implements PushService {<!-- --> // @Autowired // private RedisTemplate redisTemplate; @Override public void pushMsgToOne(String userId, String msg){<!-- --> ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap(); Channel channel = userChannelMap. get(userId); if(!Objects.isNull(channel)){<!-- --> // If the user's client is a channel established with this server, push the message directly channel.writeAndFlush(new TextWebSocketFrame(msg)); } else {<!-- --> // Publish and consume to other servers // NettyPushMessageBody pushMessageBody = new NettyPushMessageBody(); // pushMessageBody.setUserId(userId); // pushMessageBody.setMessage(msg); // redisTemplate. convertAndSend(BaseConstant. PUSH_MESSAGE_TO_ONE, pushMessageBody); } } @Override public void pushMsgToAll(String msg){<!-- --> // Publish and consume to other servers // redisTemplate.convertAndSend(BaseConstant.PUSH_MESSAGE_TO_ALL,msg); NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg)); } }
ok, this is about the completion of building websocket with netty