Introduction
Everyone should be familiar with WebSocket. If it is a single application, there will be no problem, but when our project uses the microservice architecture, there may be problems.
For example, service A has two instances A1 and A2, and the front-end WebSocket client C is connected to A1 through the load balancing of the gateway. At this time, when A2 triggers the message sending logic and needs to send a certain message to all clients, C just can’t receive the message
At this time, we can quickly think of the simplest solution, which is to forward the message from A2 to A1, and then A1 sends the message to C, so that C can receive the message sent by A2
Based on this idea, I implemented a library, and a configuration annotation handles everything
Usage
Next let’s look at the usage of this library
First we need to add an annotation @EnableWebSocketLoadBalanceConcept
to the startup class
@EnableWebSocketLoadBalanceConcept @EnableDiscoveryClient @SpringBootApplication public class AServiceApplication { public static void main(String[] args) { SpringApplication.run(AServiceApplication.class, args); } }
Then we inject WebSocketLoadBalanceConcept
where we need to send messages, and we can happily send messages across instances
@RestController @RequestMapping("/ws") public class WsController { @Autowired private WebSocketLoadBalanceConcept concept; @RequestMapping("/send") public void send(@RequestParam String msg) { concept. send(msg); } }
Is it very simple, do you think it is simpler than integrating WebSocket of a single application by yourself!
When your colleagues are still having a headache to implement manual forwarding, you have implemented the function through a configuration annotation and started to make tea
Your colleagues must be impressed by you (you can start fishing again)
I don’t know if you already have some ideas about the specific implementation after reading it.
Next, I will talk about the implementation process of this library.
Abstract ideas
In fact, I had a module that implemented similar functions specifically for WebSocket before, but some scenarios at that time were based on the project, so the implementation was relatively simple, but it was too customized and not easy to expand.
One day while chatting with a former colleague of mine, I learned that they are considering direct connection between devices and services, and the services need to be deployed as multiple instances
The direct connection between devices and services is nothing more than a long connection such as TCP. You can use the cache to save the mapping relationship between the connection and the service address to achieve the functional requirements of point-to-point forwarding.
Hearing this, does it feel like deja vu? There was a light passing through my head at that time, and there is only one truth! Isn’t this the same problem as WebSocket in cluster mode?
So I changed from thinking about WebSocket to thinking about various long connections, and finally I abstracted this problem into: Cluster solution for long connections
Whether it is WebSocket or TCP, it is a specific implementation of long connection
So we can abstract a top-level interface Connection
, and then implement WebSocketConnection
or TCPConnection
In fact, from an abstract point of view, not only long connections, but also short connections are within our abstract scope, but protocols such as HTTP do not have the above-mentioned problems, but it does not prevent you from implementing a HTTPConnection
is used to forward messages, so don’t be bound by preconceived thinking
Forwarding ideas
As mentioned earlier, the main idea of this library is to forward messages to other service instances to achieve a unicast or broadcast effect
So the design of message forwarding is very important
First of all, message forwarding requires some technical means to support data interaction
Such as HTTP, MQ, TCP, WebSocket
Speaking of which. . . Everyone is not. . . You fucking can handle it by yourself (turning the table)
Isn’t the long connection just for exchanging data, so it can be completely self-sufficient
So a wonderful idea formed in my mind:
What if each service instance used itself as a client to connect to other services?
-
In the WebSocket scenario, we use the current service instance as a WebSocket client to connect to the WebSocket server of other service instances
-
In the TCP scenario, we use the current service instance as a TCP client to connect to the TCP server of other service instances
In this way, other service instances can send messages to these fake clients, and when the fake clients on the service instance receive the messages, they can forward them to the real clients managed by themselves
Sahua family members, autistic (self-closed loop) belongs to yes
So we first need to let the service instances connect to each other
Connection process
Let’s take a look at how the connection to each other is designed
I defined a ConnectionSubscriber
interface, you can understand that our service instance is going to subscribe to listen to messages sent by other services
At the same time, a default implementation is provided, which is to connect and send messages based on its own protocol
Of course, it can flexibly support other methods. You only need to customize a ConnectionSubscriber
. If you use MQ, you can implement a MQConnectionSubscriber
or use HTTP to implement one. HTTPConnectionSubscriber
It’s just that if you use your own protocol, you don’t need to rely on other libraries or middleware. Of course, if you have strict requirements on the message loss rate, you can also use MQ as an intermediary for message forwarding. Generally speaking, ordinary WebSocket scenarios can basically tolerate a certain loss rate
Get service instance information
So how do we know which instances to connect to?
I defined a ConnectionServerManager
interface to manage service information
Of course, we can implement one by ourselves, such as configuring service instance information through configuration files
But we have a more convenient way, which is to rely on the service discovery component of Spring Cloud. Whether it is Eureka, Nacos or other registration centers, it is equivalent to supporting it. This is the charm of abstraction.
We can get all instances through DiscoveryClient#getInstances(Registration.getServiceId())
, excluding the service instances that need to be connected
When our service instance connects to other service instances, it sends a message of its own instance information, and other service instances connect to our service instance after receiving the corresponding message to ensure a certain connection timeliness. The connection is established and messages can be forwarded to each other.
At the same time, I also added heartbeat detection and automatic reconnection. When no heartbeat reply is received for a period of time, the connection will be disconnected, and the instance information will be re-queried every once in a while. If there is a service instance that does not have a corresponding connection, it will be reconnected, so that there is a certain fault tolerance in some occasional bad network situations
So far, our basic framework has been established. When we start the service, the connection between the services will be automatically established
Connection differentiation and management
Based on the above ideas, we definitely need to distinguish between the real client and the client used for forwarding
So I made a classification of these connections
Then perform a unified management of these connections
Through the connection factory ConnectionFactory
, we can adapt any connection into a Connection
object, and realize message forwarding between various connections
Each connection will configure a MessageEncoder
and MessageDecoder
for message encoding and decoding, and the encoders and decoders corresponding to different types of connections must be different, for example The forwarded message is largely different from the message sent to the real client, so an additional MessageCodecAdapter
is defined to adapt to different types of codecs. Easy to manage when defining
Message sending
Now when we send a message, the message will be forwarded to other service instances, and all clients can receive it
No, in some cases we don’t want all clients to receive it. Can we let whoever receives it?
It’s so troublesome, come on, I’ll give you all the connections, you can choose
Connection selection
We need to determine which connections to send to when a message is sent
So I defined a connection selector ConnectionSelector
Every time I want to send a message, I will match a connection selector, and then use the selector to obtain the connection that needs to send the message, and we can use the custom connection selector to achieve accurate sending of our messages
This is actually the reason why I named WebSocketLoadBalanceConcept
, why is it called LoadBalance?
Ribbon selects a Server through IRule
I use ConnectionSelector
to select a Connection collection
Is it similar
Moving on to custom selectors
Preparation:
-
Our Connection has a metadata field for storing custom attributes
-
Our Message has a headers field for storing message headers
Send a message to the specified user
In many scenarios, we need to send messages to specified users
First, when the client connects, the userId can be sent to the server through parameters or actively sending a message, and then the server will store the obtained userId in the metadata
of Connection
Then we add a header to the Message that needs to be sent, and use the corresponding userId as the message header
In this way, we can customize a connection selector by judging whether the Message contains the userId message header as the matching condition. When the userId exists in the headers of the Message, the metadata in the Connection
is matched with the userId. Filter connections that need to send messages
Since the userId is unique, when the client connected to our own service has been matched, there is no need to forward it again. If there is no match, the message will be forwarded through the client of other service instances.
Corresponding UserSelector
and UserMessage
have been implemented in the library, you can use the configuration to open and mark the user by adding the userId parameter on the connection path
Of course, we can also use the cache to accurately determine whether to forward or which services need to be forwarded, and cache some unique data such as userId and instanceId
of the service in Redis. When sending a message to the user, obtain the instanceId
or unique data of the service instance corresponding to the user from Redis. If it matches the current service, it can be sent directly. If it is another service, it will be forwarded. Just give the corresponding service
Send a message to the specified path
Another common scenario is similar topic subscription, such as subscribing to device status update data, it is necessary to send a message to each connection corresponding to the path
We can use different paths to represent different topics, and then customize a connection selector to match the connection path and the path specified in the message header
Of course, the corresponding PathSelector
and PathMessage
have been implemented in the library, which can be enabled through configuration
End
Finally, please allow me to express a little bit of my humble opinion on abstraction
Abstraction is actually the same as “Tao begets one, one begets two, two begets three, and three begets all things”. According to your top-level interface (that is, the core function), you will continue to expand outwards. Your top-level interface is Tao (in a narrow sense)
Take this library as an example. ConnectionLoadBalanceConcept
is the way of this library. Its core function is to send messages. As for how to send messages and who to send them to, it is uncertain, like a chaotic state
So what are one, two, three? We need a carrier to send a message, so we have Connection
and Message
, we need to manage Connection
So there is ConnectionRepository
, we need to forward messages so there is ConnectionSubscriber
, etc.
And everything is like a specific implementation, which can be implemented. The connection manager DiscoveryConnectionServerManager
based on Spring Cloud service discovery, the path-based connection selector PathSelector
, and the Reactive-based WebSocket connection ReactiveWebSocketConnection
Just like the world you created, various rules are constantly derived, and these rules complement each other to make your world run smoothly
Of course, there may be bugs in your world, manual dog head
Thanks for reading, I hope it can help you 🙂
Source: blog.csdn.net/m0_64360721/article/
details/125125467
recommended: The most complete java interview question bank PS: Because the official account platform has changed the push rules, if you don’t want to miss the content, remember to click “Looking” after reading and add a “Star” so that every time a new article is pushed, it will appear on your subscription list as soon as possible inside. Click "Watching" to support us!
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge Network skill treeHome pageOverview 29688 people are studying systematically