Springboot implements webocket long connection (4) –netty+websocekt+server

springboot implements webocket long connection (4)

Demo download address: Various websocket implementation methods, some of which are based on spring-websocekt, and some are based on the netty framework, which can be used immediately.

The previous blog used spring-websocket to implement the websocket server. Now we use the netty framework to implement it, which is more flexible and more performant. In some complex scenarios, the efficiency can be improved by adjusting parameters.
Previous implementations can refer to:

Springboot implements webocket (1)
Springboot implements webocket (2)

First define a netty server side to start the port.

Let me explain that netty needs to occupy a port. If your project also provides web services, the two ports cannot be the same.

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Mesh server
 *
 * @author lukou
 * @date 2023/05/17
 */
public class NettyServer {<!-- -->

    private static final Logger log = LoggerFactory. getLogger(NettyServer. class);

    private int port;
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;
    private ChannelInitializer<SocketChannel> channelInitializer;

    public NettyServer(int port, ChannelInitializer<SocketChannel> channelInitializer) {<!-- -->
        this.port = port;
        this. channelInitializer = channelInitializer;
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
    }

    /**
     * start
     *
     * @throws Exception exception
     */
    public void start() throws Exception {<!-- -->
        try {<!-- -->
            ServerBootstrap sb = new ServerBootstrap();
            //Bind thread pool
            sb.group(bossGroup, workGroup)
                    //Specify the channel to use
                    .channel(NioServerSocketChannel.class)
                    //The maximum length of the queue that temporarily stores the request that has completed the three-way handshake
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //Disable nagle algorithm, don't wait, send immediately
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //When there is no data packet coming over a certain period of time, actively send an ack detection packet
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //Allow shared ports
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    //bind listening port
                    .localAddress(this.port)
                    //Add a custom handler
                    .childHandler(this.channelInitializer);
            //The server creates the binding asynchronously
            ChannelFuture cf = sb.bind().sync();
            channel = cf. channel();
            log.info("netty service started.. is listening: [{}]", channel.localAddress());
            //Close the server channel
            channel. closeFuture(). sync();
        } catch (Exception e) {<!-- -->
            throw new Exception("An exception occurred when starting the netty service, port number: " + this.port, e);
        }
    }

    /**
     * destroy
     *
     * @throws Exception exception
     */
    public void destroy() throws Exception {<!-- -->
        try {<!-- -->
            channel. close(). sync();
            workGroup. shutdown Gracefully(). sync();
            bossGroup.shutdownGracefully().sync();
        } catch (Exception e) {<!-- -->
            throw new Exception("An exception occurred when stopping the netty service, port number: " + this.port, e);
        }

    }

}

Next, classes that need to implement business processing logic, first define an abstract class, and put some common logic in it

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import static io.netty.handler.codec.http.HttpMethod.GET;

/**
 * Basic socket server
 * One layer of abstraction.<br>
 *
 * @author lukou
 * @date 2023/05/17
 */
public abstract class BaseSocketServer extends SimpleChannelInboundHandler<Object> {<!-- -->

    private static final Logger log = LoggerFactory. getLogger(BaseSocketServer. class);

    /**websocket protocol content*/
    public static final String WEBSOCKET = "websocket";
    public static final String UPGRADE = "Upgrade";

    /**
     * Client connection address
     */
    public static final String ENDPOINT = "/example4/ws";

    /**
     * Connect unique id to facilitate link tracking
     */
    protected String taskId;

    /**
     * context
     */
    protected ChannelHandlerContext context;

    /**
     * websocket handshake handler
     */
    private WebSocketServerHandshaker webSocketServerHandshaker;

    /**
     * channel activity
     * Called when the client establishes a connection with the server.<br>
     *
     * @param context context
     */
    @Override
    public abstract void channelActive(ChannelHandlerContext context);

    /**
     * channel inactive
     * Called when the client disconnects from the server.<br>
     *
     * @param context context
     */
    @Override
    public abstract void channelInactive(ChannelHandlerContext context);

    /**
     * channel read complete
     * Called after the server ends receiving the data sent by the client.<br>
     *
     * @param context context
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext context) {<!-- -->
        context. flush();
    }

    /**
     * exception
     * Called when an exception occurs in the project.<br>
     *
     * @param context context
     * @param throwable throwable
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext context, Throwable throwable) {<!-- -->
        context. close();
        log.info("An error occurred in taskId: [{}], reason: [{}]", this.taskId, throwable.toString(), throwable);
    }

    /**
     * Channel read0
     * Connection and frame information.<br>
     *
     * @param ctx ctx
     * @param msg MSG
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {<!-- -->
        if (msg instanceof WebSocketFrame) {<!-- -->
            this.handWebSocketFrame(ctx, (WebSocketFrame) msg);
            return;
        }
        if (msg instanceof FullHttpRequest) {<!-- -->
            log.info("taskId:[{}] starts to process websocket handshake request...", taskId);
            this.httpRequestHandler(ctx, (FullHttpRequest) msg);
            log.info("taskId:[{}] processing websocket handshake request ends..", taskId);
        }
    }

    /**
     * User event trigger
     * A read timeout event is set here, you can refer to the settings in {@link Example4WebSocketChannelHandler}
     *
     * @param ctx ctx
     * @param evt evt
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {<!-- -->
        if (IdleStateEvent. class. isAssignableFrom(evt. getClass())) {<!-- -->
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event. state() == IdleState. READER_IDLE) {<!-- -->
                ctx. close();
                log.info("taskId: [{}] read operation timed out.. Disconnected..", this.taskId);
            }
        }
    }

    /**
     * Handle websocket business between client and server.<br>
     *
     * @param context context
     * @param webSocketFrame web socket frame
     */
    public void handWebSocketFrame(ChannelHandlerContext context, WebSocketFrame webSocketFrame) {<!-- -->
        //Determine whether it is an instruction to close websocket
        if (webSocketFrame instanceof CloseWebSocketFrame) {<!-- -->
            webSocketServerHandshaker.close(context.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
            log.info("taskId:[{}] received close frame.. Disconnected..", this.taskId);
            return;
        }
        //Determine whether it is a ping message
        if (webSocketFrame instanceof PingWebSocketFrame) {<!-- -->
            context.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
            log.info("taskId:[{}] received heartbeat frame...", this.taskId);
            return;
        }
        //Determine whether it is a binary message
        if (webSocketFrame instanceof TextWebSocketFrame) {<!-- -->
            this.handTextWebSocketFrame(context, webSocketFrame);
        }
    }

    /**
     * http request handler
     * http handshake request verification.<br>
     *
     * @param context context
     * @param fullHttpRequest full http request
     */
    private void httpRequestHandler(ChannelHandlerContext context, FullHttpRequest fullHttpRequest) {<!-- -->
        / / Determine whether the http handshake request
        if (!fullHttpRequest.decoderResult().isSuccess() || !(WEBSOCKET.equals(fullHttpRequest.headers().get(UPGRADE)))
                || !GET.equals(fullHttpRequest.method())) {<!-- -->
            sendHttpResponse(context, new DefaultFullHttpResponse(fullHttpRequest.protocolVersion(), HttpResponseStatus.BAD_REQUEST));
            log.error("taskId:{<!-- -->{}}websocket handshake content is incorrect.. Respond and close..", taskId);
            return;
        }
        String uri = fullHttpRequest. uri();
        log.info("taskId:{<!-- -->{}}websocket handshake uri[{}]", taskId, uri);
        if (!ENDPOINT.equals(getBasePath(uri))) {<!-- -->
            sendHttpResponse(context, new DefaultFullHttpResponse(fullHttpRequest.protocolVersion(), HttpResponseStatus.NOT_FOUND));
            log.info("taskId:[{}]websocket handshake protocol is incorrect.. Respond and close..", taskId);
            return;
        }
        WebSocketServerHandshakerFactory webSocketServerHandshakerFactory = new WebSocketServerHandshakerFactory("", null, false);
        webSocketServerHandshaker = webSocketServerHandshakerFactory. newHandshaker(fullHttpRequest);
        if (webSocketServerHandshaker == null) {<!-- -->
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel());
            log.info("taskId:[{}]websocket handshake protocol version is incorrect.. Respond and close..", taskId);
            return;
        }
        webSocketServerHandshaker.handshake(context.channel(), fullHttpRequest);
        this.checkOpenInfo(context, fullHttpRequest);
    }

    /**
     * Get the base path
     *
     * @param url url
     * @return {@link String}
     */
    public static String getBasePath(String url) {<!-- -->
        if (StringUtils.isEmpty(url)) {<!-- -->
            return null;
        }
        int idx = url. indexOf("?");
        if (idx == -1) {<!-- -->
            return url;
        }
        return url.substring(0, idx);
    }

    /**
     * Send http response
     * The server sends a response message.<br>
     *
     * @param context context
     * @param defaultFullHttpResponse default full http response
     */
    private void sendHttpResponse(ChannelHandlerContext context, DefaultFullHttpResponse defaultFullHttpResponse) {<!-- -->
        if (defaultFullHttpResponse.status().code() != 200) {<!-- -->
            ByteBuf buf = Unpooled.copiedBuffer(defaultFullHttpResponse.status().toString(), CharsetUtil.UTF_8);
            defaultFullHttpResponse.content().writeBytes(buf);
            buf. release();
        }
        // server sends data to client
        ChannelFuture future = context.channel().writeAndFlush(defaultFullHttpResponse);
        if (defaultFullHttpResponse.status().code() != 200) {<!-- -->
            future. addListener(ChannelFutureListener. CLOSE);
        }
    }

    /**
     * Reply message to client.<br>
     *
     * @param message message
     * @return {@link ChannelFuture}
     */
    protected ChannelFuture reply( String message) {<!-- -->
        ChannelFuture channelFuture = context.writeAndFlush(new TextWebSocketFrame(message));
        log.info("taskId:[{}] reply to the client with a message completed: [{}]", this.taskId, message);
        return channelFuture;
    }

    /**
     * Check open information
     * Check the message when the connection is opened.<br>
     *
     * @param context context
     * @param fullHttpRequest full http request
     */
    protected abstract void checkOpenInfo(ChannelHandlerContext context, FullHttpRequest fullHttpRequest);


    /**
     * hand text frame web socket
     * Text frame processing.<br>
     *
     * @param context context
     * @param webSocketFrame web socket frame
     */
    protected abstract void handTextWebSocketFrame(ChannelHandlerContext context, WebSocketFrame webSocketFrame);

}

instantiate

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * Service segment instantiation
 *
 * @author lukou
 * @date 2023/05/17
 */
@Component
public class MyWebSocketServer extends BaseSocketServer {<!-- -->

    private static final Logger log = LoggerFactory. getLogger(MyWebSocketServer. class);

    @Override
    public void channelActive(ChannelHandlerContext context) {<!-- -->
        this.taskId = UUID.randomUUID().toString().replaceAll("-", "");
        this.context = context;
        log.info("taskId: [{}] has a new request coming in.. Start initializing the context...", this.taskId);
    }

    @Override
    public void channelInactive(ChannelHandlerContext context) {<!-- -->
        log.info("taskId:[{}] identifies the service triggering shutdown event.", this.taskId);
        // Here you can finish processing
    }

    @Override
    protected void checkOpenInfo(ChannelHandlerContext context, FullHttpRequest fullHttpRequest) {<!-- -->
        log.info("taskId:[{}] The websocket handshake protocol in the identification service is correct.. Start to verify others..", this.taskId);
    }

    @Override
    protected void handTextWebSocketFrame(ChannelHandlerContext context, WebSocketFrame webSocketFrame) {<!-- -->
        String text = ((TextWebSocketFrame) webSocketFrame). text();
        this.reply(this.taskId + " : " + text + System.currentTimeMillis());
    }
}

After that, bind the business processing layer to the channel of netty

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.concurrent.TimeUnit;

/**
 * example4 web socket channel handler
 *
 * @author lukou
 * @date 2023/05/17
 */
public class Example4WebSocketChannelHandler extends ChannelInitializer<SocketChannel> {<!-- -->

    private static final EventExecutorGroup EVENT_EXECUTOR_GROUP = new DefaultEventExecutorGroup(100);

    @Override
    protected void initChannel(SocketChannel ch) {<!-- -->
        // If no data is read for 30 seconds, a READER_IDLE event is triggered.
        ch.pipeline().addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
        // The websocket protocol itself is based on the http protocol, so the http codec is also used here
        ch.pipeline().addLast(new HttpServerCodec());
        // Write handlers in blocks
        ch.pipeline().addLast(new ChunkedWriteHandler());
        // netty is based on segmented requests. The function of HttpObjectAggregator is to segment the request and then aggregate it. The parameter is the maximum length of aggregated bytes
        ch.pipeline().addLast(new HttpObjectAggregator(8192));
        // Add our own implementation method for receiving data in the pipeline
        ch.pipeline().addLast(EVENT_EXECUTOR_GROUP, new MyWebSocketServer());
        ch.pipeline().addLast(new WebSocketServerProtocolHandler(BaseSocketServer.ENDPOINT, null, true, 65536 * 10));
    }

}

After that, it is really used. Here is to choose to execute the netty server as soon as the project is started, and inject it into the container (see personal choice here, it does not have to be injected into spring, and direct new is the same).

import com.example.wsdemo.websocketserver.example4.netty.Example4WebSocketChannelHandler;
import com.example.wsdemo.websocketserver.example4.netty.NettyServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ServerConfig {<!-- -->

    @Value("${netty.websocket.port:8081}")
    private int port;


    @Bean("example4WebSocketChannelHandler")
    public Example4WebSocketChannelHandler example4WebSocketChannelHandler() {<!-- -->
        return new Example4WebSocketChannelHandler();
    }

    @Bean("nettyServer")
    public NettyServer nettyServer(Example4WebSocketChannelHandler example4WebSocketChannelHandler) {<!-- -->
        return new NettyServer(this.port, example4WebSocketChannelHandler);
    }
}
import com.example.wsdemo.websocketserver.example4.netty.NettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;

/**
 * example4 network socket server startup initialization
 *
 * @author lukou
 * @date 2023/05/17
 */
@Component
public class Example4WebSocketStartInit implements CommandLineRunner {<!-- -->

    private static final Logger log = LoggerFactory.getLogger(Example4WebSocketStartInit.class);

    @Resource
    private NettyServer nettyServer;

    /**
     * Need to start asynchronously, otherwise it will block the main thread
     * Customize a thread startup here, or add annotation @Async to the method, the same effect
     *
     * @param args arg game
     */
    @Override
    public void run(String... args) {<!-- -->
        new Thread(() -> {<!-- -->
            try {<!-- -->
                nettyServer.start();
            } catch (Exception e) {<!-- -->
                log.error("The netty service in the identification service starts an error!", e);
            }
        }).start();
    }

    @PreDestroy
    public void destroy() {<!-- -->
        if (nettyServer != null) {<!-- -->
            try {<!-- -->
                nettyServer.destroy();
            } catch (Exception e) {<!-- -->
                log.error("An exception occurred when stopping the netty service!", e);
            }
        }
        log.info("netty identification service has been destroyed..");
    }
}

Start the test, if the following appears, it means that the service has started successfully.

 . ____ _ _ _ _
 /\ /___'_ __ _ _(_)_ __ _ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/ ___)| |_)| | | | | || (_| | ) ) ) )
  ' |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|===============|___/=/_/_/_/
 :: Spring Boot :: (v2.3.5.RELEASE)

2023-05-18 14: 36: 06.423 Info 3624 --- [Main] C.E.W.W.WEBSOCKETSERAPPLATION: Starting WebSocketServerapplication on Qianpeng with Pid 3624 (D: \ Project. CTS \ WebSocket-Max \ Websocket-Server \ Target \ Classes Started By D:\projects\websocket-max)
2023-05-18 14:36:06.425 INFO 3624 --- [ main] c.e.w.w.WebsocketServerApplication : No active profile set, falling back to default profiles: default
2023-05-18 14:36:06.957 INFO 3624 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9000 (http)
2023-05-18 14:36:06.963 INFO 3624 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2023-05-18 14:36:06.963 INFO 3624 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.39]
2023-05-18 14:36:07.012 INFO 3624 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2023-05-18 14:36:07.012 INFO 3624 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 560 ms
2023-05-18 14:36:07.078 INFO 3624 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'clientInboundChannelExecutor'
2023-05-18 14:36:07.079 INFO 3624 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'clientOutboundChannelExecutor'
2023-05-18 14:36:07.164 INFO 3624 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'defaultSockJsTaskScheduler'
2023-05-18 14:36:07.181 INFO 3624 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'messageBrokerTaskScheduler'
2023-05-18 14:36:07.186 INFO 3624 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'brokerChannelExecutor'
2023-05-18 14:36:07.319 INFO 3624 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9000 (http) with context path ''
2023-05-18 14:36:07.320 INFO 3624 --- [ main] o.s.m.s.b. SimpleBrokerMessageHandler : Starting...
2023-05-18 14:36:07.320 INFO 3624 --- [ main] o.s.m.s.b.SimpleBrokerMessageHandler : BrokerAvailabilityEvent[available=true, SimpleBrokerMessageHandler[DefaultSubscriptionRegistry[cache[0 destination(s)], registry[ 0 sessions]]]]
2023-05-18 14:36:07.320 INFO 3624 --- [ main] o.s.m.s.b. SimpleBrokerMessageHandler : Started.
2023-05-18 14:36:07.325 INFO 3624 --- [ main] c.e.w.w.WebsocketServerApplication : Started WebsocketServerApplication in 1.11 seconds (JVM running for 1.619)
2023-05-18 14:36:07.679 INFO 3624 --- [ Thread-4] c.e.w.w.example4.netty.NettyServer : netty service started. . Listening:[/0:0:0:0:0:0:0:0:8081]

The access address of websocket is: ws://localhost:8081/example4/ws

The port is no longer the project’s port.

In addition, when starting nettyserver, another thread needs to be started, and it cannot be directly in the main thread, otherwise it will be blocked there.

Welcome to correct me!