I have a friend~
I made a small broken website, and now I want to implement a function of pushing web messages in the station. Yes, it is the little red dot in the picture below, a very commonly used function.
But he hasn’t figured out how to do it yet. Here I helped him sort out several solutions and implemented them simply.
What is message push (push)
There are many push scenarios. For example, if someone pays attention to my official account, I will receive a push message to attract me to click to open the app.
Message push (push
) usually refers to the active message push of the user’s current web page or mobile device APP by the operator of the website through some tool.
Message push is generally divided into web message push
and mobile message push
.
The one above belongs to the mobile message push, and the common message push on the web, such as internal messages, the number of unread emails, the number of monitoring alarms, etc., is also widely used.
Before the specific implementation, let’s analyze the previous requirements again. In fact, the function is very simple. As long as an event is triggered (actively sharing resources or actively pushing messages in the background), the notification red dot on the web page will be displayed in real time + 1
will do.
Usually there are several message push tables on the server side, which are used to record different types of messages pushed by users triggering different events, and the front end actively queries (pulls) or passively receives (pushes) the number of all unread messages from users.
Message push is nothing more than push (push
) and pull (pull
), let’s understand one by one below.
Short Polling
Polling (polling
) should be the easiest way to implement message push schemes. Here we divide polling into short polling
and long polling
.
Short polling is easy to understand. At a specified time interval, the browser sends a HTTP
request to the server, and the server returns unread message data to the client in real time, and the browser renders and displays it.
A simple JS timer can do it, request the unread message count interface every second, and display the returned data.
setInterval(() => { // method request messageCount().then((res) => { if (res.code === 200) { this.messageCount = res.data } }) }, 1000);
The effect is still possible. Although the implementation of short polling is simple, the disadvantages are also obvious. Since the push data does not change frequently, no matter whether there is a new message at the back end at this time, the client will make a request, which will inevitably cause great damage to the server. High stress, waste of bandwidth and server resources.
Long polling
Long polling is an improved version of the above short polling. While minimizing the waste of server resources, it ensures the relative real-time performance of messages. Long polling is widely used in middleware, such as Nacos
and apollo
configuration center, message queue kafka
, RocketMQ
.
This time I used the apollo
configuration center to implement long polling, and applied a class DeferredResult
, which passed Spring after servelet3.0
An asynchronous request mechanism provided by encapsulation means to delay the result.
DeferredResult
can allow the container thread to quickly release the occupied resources without blocking the request thread, so as to accept more requests to improve the throughput of the system, and then start an asynchronous worker thread to process the real business logic and complete the call DeferredResult.setResult(200)
submits the response result.
Below we use long polling to implement message push.
Because an ID may be monitored by multiple long polling requests, I use the Multimap
structure provided by the guava
package to store long polling. One key can correspond to multiple values . Once a key change is detected, all corresponding long polls will respond. The front end gets the status code of non-request timeout, knows the data change, actively queries the unread message count interface, and updates the page data.
@Controller @RequestMapping("/polling") public class PollingController { // Store the long polling collection that monitors a certain Id // thread synchronization structure public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create()); /** * Official account: Programmer Xiaofu * Set up monitoring */ @GetMapping(path = "watch/{id}") @ResponseBody public DeferredResult<String> watch(@PathVariable String id) { // delay object setting timeout DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT); // Remove the key when the asynchronous request is completed to prevent memory overflow deferredResult.onCompletion(() -> { watchRequests. remove(id, deferredResult); }); // register long polling request watchRequests. put(id, deferredResult); return deferredResult; } /** * Official account: Programmer Xiaofu * change data */ @GetMapping(path = "publish/{id}") @ResponseBody public String publish(@PathVariable String id) { // Data change Take out all the long polling requests of the monitoring ID and respond to them one by one if (watchRequests. containsKey(id)) { Collection<DeferredResult<String>> deferredResults = watchRequests. get(id); for (DeferredResult<String> deferredResult : deferredResults) { deferredResult.setResult("I updated" + new Date()); } } return "success"; }
When the request exceeds the set timeout period, an AsyncRequestTimeoutException
exception will be thrown. Here, you can directly use @ControllerAdvice
to capture and return it globally. The front end will start again after obtaining the agreed status code Long polling request, repeated calls like this.
@ControllerAdvice public class AsyncRequestTimeoutHandler { @ResponseStatus(HttpStatus.NOT_MODIFIED) @ResponseBody @ExceptionHandler(AsyncRequestTimeoutException. class) public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) { System.out.println("Asynchronous request timed out"); return "304"; } }
Let’s test it. First, the page initiates a long polling request /polling/watch/10086
to monitor the message change, the request is suspended, and the data is not changed until it times out, and the long polling request is initiated again; Then manually change the data /polling/publish/10086
, the long polling gets a response, and the front-end processing business logic is completed and the request is initiated again, and so on.
Compared with short polling, long polling has improved a lot in performance, but it still generates more requests, which is its imperfection.
iframe stream
The iframe flow is to insert a hidden tag in the page, and request the API interface of the number of messages in
src
, thereby creating an A long connection, the server continues to transmit data to the iframe
.
”
The transmitted data is usually
HTML
, or an embeddedjavascript
script to achieve the effect of updating the page in real time.
This method is simple to implement, and the front end only needs one tag to get it done
<iframe src="/iframe/message" style="display:none"></iframe>
The server directly assembles html and js script data and writes them to response
@Controller @RequestMapping("/iframe") public class IframeController { @GetMapping(path = "message") public void message(HttpServletResponse response) throws IOException, InterruptedException { while (true) { response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache, no-store"); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().print("<script type="text/javascript">\ " + "parent.document.getElementById('clock').innerHTML = "" + count.get() + "";" + "parent.document.getElementById('count').innerHTML = "" + count.get() + "";" + "</script>"); } } }
But I personally don’t recommend it, because it will show that the request has not been loaded on the browser, and the icon will keep rotating, which is simply a killer of obsessive-compulsive disorder.
SSE (My Way)
Many people may not know that in addition to the familiar mechanism of WebSocket
, there is also a server-sent event (Server-sent events
>), or SSE
for short.
SSE
is based on the HTTP
protocol. We know that the HTTP protocol in the general sense cannot enable the server to actively push messages to the client, but SSE is an exception. It Changed a way of thinking.
SSE opens a one-way channel between the server and the client. The server responds no longer with a one-time data packet but with a text/event-stream
type of data flow information. When there is a data change while streaming from the server to the client.
The overall implementation idea is a bit similar to online video playback. The video stream will be continuously pushed to the browser. You can also understand that it takes a long time for the client to complete a download (the network is not smooth).
SSE
is similar to WebSocket
, both of which can establish communication between the server and the browser to push messages from the server to the client, but there are still some differences:
-
SSE is based on the HTTP protocol, and they do not require a special protocol or server implementation to work;
WebSocket
requires a separate server to handle the protocol. -
SSE one-way communication, only one-way communication from the server to the client; webSocket full-duplex communication, that is, both parties to the communication can send and receive information at the same time.
-
SSE is easy to implement and has low development costs, without the need to introduce other components; WebSocket transmission data needs to be analyzed twice, and the development threshold is higher.
-
SSE supports disconnection and reconnection by default; WebSocket needs to be implemented by itself.
-
SSE can only transmit text messages, and binary data needs to be encoded before transmission; WebSocket supports the transmission of binary data by default.
How to choose between SSE and WebSocket?
”
Technology is not good or bad, only which one is more suitable
SSE seems to have been unknown to everyone, partly because of the emergence of WebSockets, which provides a richer protocol to perform two-way, full-duplex communication. For gaming, instant messaging, and scenarios that require bi-directional near real-time updates, having bi-directional channels is more attractive.
However, in some cases, there is no need to send data from the client. And you just need some updates for server operations. For example: in-site messages, number of unread messages, status update, stock quotes, monitoring quantity and other scenarios, SEE
has more advantages in terms of implementation difficulty and cost. Additionally, SSE has several features that WebSockets
lack by design, such as: automatic reconnect
, event ID
, and send arbitrary events
capabilities.
The front end only needs to make an HTTP request, bring a unique ID, open the event stream, and listen to the events pushed by the server.
<script> let source = null; let userId = 7777 if (window. EventSource) { // establish connection source = new EventSource('http://localhost:7777/sse/sub/' + userId); setMessageInnerHTML("connect user=" + userId); /** * Once the connection is established, the open event will be triggered * Another way of writing: source.onopen = function (event) {} */ source. addEventListener('open', function (e) { setMessageInnerHTML("Establish a connection..."); }, false); /** * The client receives the data sent by the server * Another way of writing: source.onmessage = function (event) {} */ source. addEventListener('message', function (e) { setMessageInnerHTML(e.data); }); } else { setMessageInnerHTML("Your browser does not support SSE"); } </script>
The implementation of the server is simpler, create a SseEmitter
object and put it into sseEmitterMap
for management
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); /** * create connection * * @date: 2022/7/12 14:51 * @auther: Official Account: Programmer Xiaofu */ public static SseEmitter connect(String userId) { try { // Set the timeout period, 0 means no expiration. 30 seconds by default SseEmitter sseEmitter = new SseEmitter(0L); // register callback sseEmitter.onCompletion(completionCallBack(userId)); sseEmitter.onError(errorCallBack(userId)); sseEmitter.onTimeout(timeoutCallBack(userId)); sseEmitterMap.put(userId, sseEmitter); count. getAndIncrement(); return sseEmitter; } catch (Exception e) { log.info("Creating a new sse connection exception, current user: {}", userId); } return null; } /** * Send a message to a specified user * * @date: 2022/7/12 14:51 * @auther: Official Account: Programmer Xiaofu */ public static void sendMessage(String userId, String message) { if (sseEmitterMap. containsKey(userId)) { try { sseEmitterMap.get(userId).send(message); } catch (IOException e) { log.error("User [{}] push exception: {}", userId, e.getMessage()); removeUser(userId); } } }
We simulated the push message from the server and saw that the client received the message, which was consistent with our expected effect.
Note: SSE does not support IE
browser, and it does a good job of compatibility with other mainstream browsers.
MQTT
What is the MQTT protocol?
MQTT
full name (Message Queue Telemetry Transport): a lightweight
Internet of Thing
), obtains messages by subscribing to corresponding topics.
The protocol separates the message publisher (publisher
) from the subscriber (subscriber
), so it can provide reliable The messaging service is somewhat similar to traditional MQ.
The TCP
protocol is located at the transport layer, and the MQTT
protocol is located at the application layer. The MQTT
protocol is built on the TCP/IP
protocol. That is to say, as long as the TCP/IP
protocol stack is supported, the MQTT
protocol can be used.
Why use the MQTT protocol?
Why is the MQTT
protocol so preferred in the Internet of Things (IOT)? Instead of other protocols, such as the more familiar HTTP
protocol?
-
First of all, the
HTTP
protocol is a synchronous protocol. After the client requests, it needs to wait for the server's response. In the Internet of Things (IOT) environment, devices will be subject to environmental influences, such as low bandwidth, high network latency, unstable network communication, etc. Obviously, the asynchronous message protocol is more suitable forIOT
applications . -
HTTP
is one-way. If you want to get a message, the client must initiate a connection. In Internet of Things (IOT) applications, devices or sensors are often clients, which means they cannot passively Receive commands from the network. -
Usually a command or message needs to be sent to all devices on the network.
HTTP
to achieve such a function is not only very difficult, but also extremely expensive.
The specific introduction and practice of the MQTT protocol, I will not repeat it here, you can refer to my previous two articles, which are also very detailed.
Introduction to MQTT protocol
I didn't expect springboot + rabbitmq to be a smart home, it would be so simple
MQTT implements message push
Unread messages (small red dots), front-end and RabbitMQ real-time message push practice, very simple~
Websockets
websocket
should be a method that everyone is familiar with to implement message push. We also compared it with websocket when we talked about SSE.
WebSocket is a protocol for full-duplex communication over a TCP
connection, establishing a communication channel between a client and a server. The browser and the server only need one handshake, and a persistent connection can be created directly between the two, and two-way data transmission can be carried out.
springboot integrates websocket, and first introduces websocket
related toolkits, which have additional development costs compared with SSE.
<!-- introduce websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
The server uses the @ServerEndpoint
annotation to mark the current class as a websocket server, and the client can connect to the WebSocket server through ws://localhost:7777/webSocket/10086
.
@Component @Slf4j @ServerEndpoint("/websocket/{userId}") public class WebSocketServer { //A connection session with a client needs to be used to send data to the client private Session session; private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>(); // Used to store the number of line connections private static final Map<String, Session> sessionPool = new HashMap<String, Session>(); /** * Official account: Programmer Xiaofu * The method that the link calls successfully */ @OnOpen public void onOpen(Session session, @PathParam(value = "userId") String userId) { try { this.session = session; webSockets. add(this); sessionPool. put(userId, session); log.info("websocket message: There are new connections, the total number is: " + webSockets.size()); } catch (Exception e) { } } /** * Official account: Programmer Xiaofu * The method called after receiving the client message */ @OnMessage public void onMessage(String message) { log.info("websocket message: received client message: " + message); } /** * Official account: Programmer Xiaofu * This is a single message */ public void sendOneMessage(String userId, String message) { Session session = sessionPool. get(userId); if (session != null & amp; & amp; session.isOpen()) { try { log.info("websocket message: single message:" + message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } }
The front-end initializes and opens the WebSocket connection, and monitors the connection status, receives data from the server or sends data to the server.
<script> var ws = new WebSocket('ws://localhost:7777/webSocket/10086'); // get connection status console.log('ws connection status:' + ws.readyState); //Monitor whether the connection is successful ws.onopen = function () { console.log('ws connection status:' + ws.readyState); //Send a data if the connection is successful ws.send('test1'); } // Listen to the information sent back by the server and process the display ws.onmessage = function (data) { console.log('Received message from server:'); console. log(data); //Close the WebSocket connection after completing the communication ws. close(); } // listen for connection close event ws.onclose = function () { // Monitor the status of websocket during the whole process console.log('ws connection status:' + ws.readyState); } // Listen and handle error events ws.onerror = function (error) { console. log(error); } function sendMessage() { var content = $("#message"). val(); $.ajax({ url: '/socket/publish?userId=10086 & amp;message=' + content, type: 'GET', data: { "id": "7777", "content": content }, success: function (data) { console. log(data) } }) } </script>
The page is initialized to establish a websocket connection, and then two-way communication can be performed, and the effect is not bad
Custom push
Above we gave me the principles and code implementations of 6 schemes, but in the actual business development process, you can’t use them blindly, but you should choose the appropriate scheme based on your own system’s business characteristics and actual scenarios.
The most direct way to push is to use the third push platform. After all, The demand that money can solve is not a problem. It can be used directly without complicated development and maintenance, saving time, effort, and worry, like goEasy , Jiguang push are very good three-party service providers.
Generally, large companies have self-developed message push platforms. For example, the web site message we implemented this time is just a touch point on the platform. SMS, email, WeChat official account, and applets can all be accessed by channels that can reach users. Come in.
The interior of the message push system is quite complicated, such as the maintenance and review of message content, the delineation of push groups, reach filtering and interception (push rule frequency, time period, quantity, black and white lists, keywords, etc.), and many modules for push failure compensation , There are also many scenarios involving large amounts of data and high concurrency technically. So our implementation today is just a small fight in front of this huge system.