Websocket cluster solution and actual combat (with pictures and text source code)

Recently, I am working on a message push function in the project. For example, after a customer places an order, a system notification is sent to the corresponding customer. This kind of message push requires the use of full-duplex websocket push messages.

The so-called full duplex means that both the client and the server can send messages to each other. The reason why http, which is also full-duplex, is not used is because http can only be actively initiated by the client, and the service returns the message after receiving it. After the websocket connection is established, both the client and the server can actively send messages to each other.

Websocket sends and receives messages in stand-alone mode:

After user A and user B establish a connection with the web server, user A sends a message to the server, and the server pushes it to user B. On a stand-alone system, all users establish connections with the same server, and all sessions are stored in the same in the server.

A single server cannot support tens of thousands of people connecting to the same server at the same time. Distribution or clustering is needed to load balance request connections to different services. The sender and receiver of the message are on the same server, which is similar to a single server and can successfully receive the message:


However, load balancing uses a polling algorithm, which cannot guarantee that the message sender and receiver are on the same server. When the sender and receiver are not on the same server, the receiver cannot receive the message:

Ideas for solving websocket cluster problems
Each time the client and server establish a connection, a stateful session is created, and the server saves the session to maintain the connection. The client can only connect to one server in the cluster server at a time, and will also transmit data to that server in the future.

To solve cluster problems, session sharing should be considered. After the client successfully connects to the server, other servers will also know that the client has successfully connected.

Option 1: session sharing (not feasible)
How does http, similar to websocket, solve cluster problems? One solution is to share the session. After the client logs in to the server, the session information is stored in the Redis database. When connecting to other servers, the session is obtained from Redis. In fact, the session information is stored in Redis to realize redis sharing.

The premise that the session can be shared is that it can be serialized, but the session of websocket cannot be serialized. The session of http records the requested data, and the session of websocket corresponds to the connection, which is connected to different servers. The session is also Different and cannot be serialized.

Option 2: ip hash (not feasible)
Instead of using session sharing, http can use the IP hash algorithm of Nginx load balancing. The client requests the same server every time. The client’s session is saved on the server, and subsequent requests request the server and can be obtained. session, there is no distributed session problem.

Compared with HTTP, websocket can actively push messages to the client by the server. If the server that receives the message and the server that sends the message are not the same server, the server that sends the message cannot find the session corresponding to the message, that is, If the two sessions are not on the same server, messages cannot be pushed. As shown below:

The way to solve the problem is to put all message senders and receivers under the same server. However, the message senders and receivers are uncertain, which is obviously impossible to achieve.

Option 3: Broadcast mode
Only when the sender and receiver of the message are in the same server can the message be sent. Then you can change your thinking and notify all servers of the message in the form of message broadcast. You can use the message middleware publish-subscribe mode to separate the message. Server restrictions are sent to the middleware and then to the subscribed server. Similar to broadcasting, as long as you subscribe to the message, you can receive the message notification:

The publisher publishes the message to the message middleware, which then sends it to all subscribers:

Implementation of broadcast mode
Build a stand-alone websocket
Refer to the previous websocket stand-alone construction article, first build a stand-alone websocket to push messages.

  1. Add dependencies
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. Create a bean instance of ServerEndpointExporter
    The bean instance of ServerEndpointExporter automatically registers the websocket endpoint declared by the @ServerEndpoint annotation. This configuration is required to start tomcat when using springboot. This configuration is not required when using independent tomcat.
@Configuration
public class WebSocketConfig {<!-- -->
    //This configuration is not required for tomcat startup
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {<!-- -->
        return new ServerEndpointExporter();
    }
}
  1. Create service endpoint ServerEndpoint and client
    service endpoint
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {<!-- -->

 private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

 private Session session;

 @OnOpen
 public void onOpen(Session session) throws SocketException {<!-- -->
  this.session = session;
  webSocketSet.put(this.session.getId(),this);

  log.info("[websocket] has new connections, total number: {}", webSocketSet.size());
 }

 @OnClose
 public void onClose(){<!-- -->
  String id = this.session.getId();
  if (id != null){<!-- -->
   webSocketSet.remove(id);
   log.info("[websocket] connection disconnection: total number: {}", webSocketSet.size());
  }
 }

 @OnMessage
 public void onMessage(String message){<!-- -->
  if (!message.equals("ping")){<!-- -->
   log.info("[wesocket] received the message sent by the client, message={}",message);
   sendMessage(message);
  }
 }

 /**
  * Send a message
  * @param message
  * @return
  */
 public void sendMessage(String message){<!-- -->
  for (WebSocket webSocket : webSocketSet.values()) {<!-- -->
   webSocket.session.getAsyncRemote().sendText(message);
  }
  log.info("[wesocket] send message, message={}", message);

 }

}

client endpoint

<div>
    <input type="text" name="message" id="message">
    <button id="sendBtn">Send</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="//i2.wp.com/cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {<!-- -->
        console.log("Connection open ...");
    };

    ws.onmessage = function(evt) {<!-- -->
        console.log( "Received Message: " + evt.data);
        var p = $("<p>" + evt.data + "</p>")
        $("#content").prepend(p);
        $("#message").val("");
    };

    ws.onclose = function(evt) {<!-- -->
        console.log("Connection closed.");
    };

    $("#sendBtn").click(function(){<!-- -->
        var aa = $("#message").val();
        ws.send(aa);
    })

</script>

OnOpen, onclose, and onmessage in the server and client are all in one-to-one correspondence.

After the service is started, the client ws.onopen calls the server’s @OnOpen annotation method, stores the client’s session information, and shakes hands to establish the connection.
The client calls ws.send to send a message, and the method corresponding to the @OnMessage annotation on the server receives the message.
The server calls session.getAsyncRemote().sendText to send the message, and the corresponding client ws.onmessage receives the message.
add controller

@GetMapping({<!-- -->"","index.html"})
public ModelAndView index() {<!-- -->
 ModelAndView view = new ModelAndView("index");
 return view;
}

Show results
Open two clients, one client sends a message, and the other client can also receive the message.

Add RabbitMQ middleware
The more commonly used RabbitMQ is used here as the message middleware, and RabbitMQ supports the publish and subscribe mode:

Add message subscription
The switch uses a sector switch, and messages are distributed to each queue bound to the switch. Use the IP + port of the server as the unique identifier to name the queue, start a service, use the queue to bind the switch, and implement message subscription:

@Configuration
public class RabbitConfig {<!-- -->

    @Bean
    public FanoutExchange fanoutExchange() {<!-- -->
        return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
    }

    @Bean
    public Queue psQueue() throws SocketException {<!-- -->
        // ip + port is the queue name
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }

    @Bean
    public Binding routingFirstBinding() throws SocketException {<!-- -->
        return BindingBuilder.bind(psQueue()).to(fanoutExchange());
    }
}

Modify the service endpoint ServerEndpoint
Add a message receiving method in WebSocket, @RabbitListener receives the message, the queue name is named with a constant, and the dynamic queue name is #{name}, where name is the bean name of the Queue:

@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {<!-- -->
  System.out.println(message);
  sendMessage(message);
}

Then call the sendMessage method to send it to the connected client.

Modify message sent
In the onMessage method of the WebSocket class, change the message sending to RabbitMQ mode:

@OnMessage
public void onMessage(String message){<!-- -->
  if (!message.equals("ping")){<!-- -->
    log.info("[wesocket] received the message sent by the client, message={}",message);
    //sendMessage(message);
    if (rabbitTemplate == null) {<!-- -->
      rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
    }
    rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
  }
}

The message notification process is as follows:


Start two instances to simulate a cluster environment
Open the Edit Configurations of the idea:

Click COPY in the upper left corner and add port server.port=8081:

Start two services with ports 8080 and 8081 respectively. After starting the service on port 8081, change the front-end connection port to 8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");

Show results