An annotation implements the WebSocket cluster solution, so it’s cool to play!

Introduction

Everyone should be familiar with WebSocket. If it is a single application, there will be no problem, but when our project uses the microservice architecture, there may be problems.

For example, service A has two instances A1 and A2, and the front-end WebSocket client C is connected to A1 through the load balancing of the gateway. At this time, when A2 triggers the message sending logic and needs to send a certain message to all clients, C just can’t receive the message

At this time, we can quickly think of the simplest solution, which is to forward the message from A2 to A1, and then A1 sends the message to C, so that C can receive the message sent by A2

2a11f68385ca883e1d2938f9a7f980e3.png

Based on this idea, I implemented a library, and a configuration annotation handles everything

Usage

Next let’s look at the usage of this library

First we need to add an annotation @EnableWebSocketLoadBalanceConcept to the startup class

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
public class AServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(AServiceApplication.class, args);
    }
}

Then we inject WebSocketLoadBalanceConcept where we need to send messages, and we can happily send messages across instances

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept. send(msg);
    }
}

Is it very simple, do you think it is simpler than integrating WebSocket of a single application by yourself!

When your colleagues are still having a headache to implement manual forwarding, you have implemented the function through a configuration annotation and started to make tea

Your colleagues must be impressed by you (you can start fishing again)

I don’t know if you already have some ideas about the specific implementation after reading it.

Next, I will talk about the implementation process of this library.

Abstract ideas

In fact, I had a module that implemented similar functions specifically for WebSocket before, but some scenarios at that time were based on the project, so the implementation was relatively simple, but it was too customized and not easy to expand.

One day while chatting with a former colleague of mine, I learned that they are considering direct connection between devices and services, and the services need to be deployed as multiple instances

The direct connection between devices and services is nothing more than a long connection such as TCP. You can use the cache to save the mapping relationship between the connection and the service address to achieve the functional requirements of point-to-point forwarding.

Hearing this, does it feel like deja vu? There was a light passing through my head at that time, and there is only one truth! Isn’t this the same problem as WebSocket in cluster mode?

So I changed from thinking about WebSocket to thinking about various long connections, and finally I abstracted this problem into: Cluster solution for long connections

Whether it is WebSocket or TCP, it is a specific implementation of long connection

So we can abstract a top-level interface Connection, and then implement WebSocketConnection or TCPConnection

In fact, from an abstract point of view, not only long connections, but also short connections are within our abstract scope, but protocols such as HTTP do not have the above-mentioned problems, but it does not prevent you from implementing a HTTPConnection is used to forward messages, so don’t be bound by preconceived thinking

Forwarding ideas

As mentioned earlier, the main idea of this library is to forward messages to other service instances to achieve a unicast or broadcast effect

So the design of message forwarding is very important

First of all, message forwarding requires some technical means to support data interaction

Such as HTTP, MQ, TCP, WebSocket

Speaking of which. . . Everyone is not. . . You fucking can handle it by yourself (turning the table)

Isn’t the long connection just for exchanging data, so it can be completely self-sufficient

So a wonderful idea formed in my mind:

What if each service instance used itself as a client to connect to other services?

  • In the WebSocket scenario, we use the current service instance as a WebSocket client to connect to the WebSocket server of other service instances

  • In the TCP scenario, we use the current service instance as a TCP client to connect to the TCP server of other service instances

In this way, other service instances can send messages to these fake clients, and when the fake clients on the service instance receive the messages, they can forward them to the real clients managed by themselves

Sahua family members, autistic (self-closed loop) belongs to yes

So we first need to let the service instances connect to each other

Connection process

Let’s take a look at how the connection to each other is designed

da3e0396b98adc308c59c67e8274d88d.png

I defined a ConnectionSubscriber interface, you can understand that our service instance is going to subscribe to listen to messages sent by other services

At the same time, a default implementation is provided, which is to connect and send messages based on its own protocol

Of course, it can flexibly support other methods. You only need to customize a ConnectionSubscriber. If you use MQ, you can implement a MQConnectionSubscriber or use HTTP to implement one. HTTPConnectionSubscriber

It’s just that if you use your own protocol, you don’t need to rely on other libraries or middleware. Of course, if you have strict requirements on the message loss rate, you can also use MQ as an intermediary for message forwarding. Generally speaking, ordinary WebSocket scenarios can basically tolerate a certain loss rate

Get service instance information

So how do we know which instances to connect to?

I defined a ConnectionServerManager interface to manage service information

Of course, we can implement one by ourselves, such as configuring service instance information through configuration files

But we have a more convenient way, which is to rely on the service discovery component of Spring Cloud. Whether it is Eureka, Nacos or other registration centers, it is equivalent to supporting it. This is the charm of abstraction.

We can get all instances through DiscoveryClient#getInstances(Registration.getServiceId()), excluding the service instances that need to be connected

When our service instance connects to other service instances, it sends a message of its own instance information, and other service instances connect to our service instance after receiving the corresponding message to ensure a certain connection timeliness. The connection is established and messages can be forwarded to each other.

At the same time, I also added heartbeat detection and automatic reconnection. When no heartbeat reply is received for a period of time, the connection will be disconnected, and the instance information will be re-queried every once in a while. If there is a service instance that does not have a corresponding connection, it will be reconnected, so that there is a certain fault tolerance in some occasional bad network situations

So far, our basic framework has been established. When we start the service, the connection between the services will be automatically established

Connection differentiation and management

Based on the above ideas, we definitely need to distinguish between the real client and the client used for forwarding

So I made a classification of these connections

d350d39a266e47bf487796002375bcd7.png

Then perform a unified management of these connections

3d5fdc29243afc9c8041afb15c67cc05.png

Through the connection factory ConnectionFactory, we can adapt any connection into a Connection object, and realize message forwarding between various connections

Each connection will configure a MessageEncoder and MessageDecoder for message encoding and decoding, and the encoders and decoders corresponding to different types of connections must be different, for example The forwarded message is largely different from the message sent to the real client, so an additional MessageCodecAdapter is defined to adapt to different types of codecs. Easy to manage when defining

Message sending

Now when we send a message, the message will be forwarded to other service instances, and all clients can receive it

No, in some cases we don’t want all clients to receive it. Can we let whoever receives it?

It’s so troublesome, come on, I’ll give you all the connections, you can choose

Connection selection

We need to determine which connections to send to when a message is sent

46532984ae83864840b6de052de122de.png

So I defined a connection selector ConnectionSelector

Every time I want to send a message, I will match a connection selector, and then use the selector to obtain the connection that needs to send the message, and we can use the custom connection selector to achieve accurate sending of our messages

This is actually the reason why I named WebSocketLoadBalanceConcept, why is it called LoadBalance?

Ribbon selects a Server through IRule

I use ConnectionSelector to select a Connection collection

Is it similar

Moving on to custom selectors

Preparation:

  • Our Connection has a metadata field for storing custom attributes

  • Our Message has a headers field for storing message headers

Send a message to the specified user

In many scenarios, we need to send messages to specified users

First, when the client connects, the userId can be sent to the server through parameters or actively sending a message, and then the server will store the obtained userId in the metadata of Connection

Then we add a header to the Message that needs to be sent, and use the corresponding userId as the message header

In this way, we can customize a connection selector by judging whether the Message contains the userId message header as the matching condition. When the userId exists in the headers of the Message, the metadata in the Connection is matched with the userId. Filter connections that need to send messages

Since the userId is unique, when the client connected to our own service has been matched, there is no need to forward it again. If there is no match, the message will be forwarded through the client of other service instances.

Corresponding UserSelector and UserMessage have been implemented in the library, you can use the configuration to open and mark the user by adding the userId parameter on the connection path

Of course, we can also use the cache to accurately determine whether to forward or which services need to be forwarded, and cache some unique data such as userId and instanceId of the service in Redis. When sending a message to the user, obtain the instanceId or unique data of the service instance corresponding to the user from Redis. If it matches the current service, it can be sent directly. If it is another service, it will be forwarded. Just give the corresponding service

Send a message to the specified path

Another common scenario is similar topic subscription, such as subscribing to device status update data, it is necessary to send a message to each connection corresponding to the path

We can use different paths to represent different topics, and then customize a connection selector to match the connection path and the path specified in the message header

Of course, the corresponding PathSelector and PathMessage have been implemented in the library, which can be enabled through configuration

End

Finally, please allow me to express a little bit of my humble opinion on abstraction

Abstraction is actually the same as “Tao begets one, one begets two, two begets three, and three begets all things”. According to your top-level interface (that is, the core function), you will continue to expand outwards. Your top-level interface is Tao (in a narrow sense)

Take this library as an example. ConnectionLoadBalanceConcept is the way of this library. Its core function is to send messages. As for how to send messages and who to send them to, it is uncertain, like a chaotic state

So what are one, two, three? We need a carrier to send a message, so we have Connection and Message, we need to manage Connection So there is ConnectionRepository, we need to forward messages so there is ConnectionSubscriber, etc.

And everything is like a specific implementation, which can be implemented. The connection manager DiscoveryConnectionServerManager based on Spring Cloud service discovery, the path-based connection selector PathSelector, and the Reactive-based WebSocket connection ReactiveWebSocketConnection

Just like the world you created, various rules are constantly derived, and these rules complement each other to make your world run smoothly

Of course, there may be bugs in your world, manual dog head

Thanks for reading, I hope it can help you 🙂

Source: blog.csdn.net/m0_64360721/article/

details/125125467

recommended:
The most complete java interview question bank

PS: Because the official account platform has changed the push rules, if you don’t want to miss the content, remember to click “Looking” after reading and add a “Star” so that every time a new article is pushed, it will appear on your subscription list as soon as possible inside. Click "Watching" to support us! 

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge Network skill treeHome pageOverview 29688 people are studying systematically