1.Introduce RocketMQ dependencies: First, add RocketMQ dependencies in the pom.xml
file:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> <!-- The version number is adjusted according to the actual situation --> </dependency>
2.Configure RocketMQ connection information: Configure RocketMQ connection information in application.properties
or application.yml
, including Name Server address, etc.:
spring: application: name: ${sn.publish} cloud: stream: rocketmq: binder: name-server: ${rocket-mq.name-server} bindings: output: producer: group: testSocket sync: true bindings: output: destination: test-topic content-type: application/json
3.Message publishing component
@Component public class MqSourceComponent { @Resource Source source; public void publishNotify(SampleNotifyDTO notify) { source.output().send(MessageBuilder.withPayload(notify).build()); } }
4.Message publishing controller
@RestController @Api(tags = "rocketmq") public class MqController { @Resource MqSourceComponent mq; @ApiOperation(value = "Test publishing message") @PostMapping("test-publish") public JsonVO<String> testSend(SampleNotifyDTO notify) { mq.publishNotify(notify); return JsonVO.success("Message has been sent"); } }
Project structure:
Next is the construction of the websocket module
1. Add dependencies
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> <!-- The version number is adjusted according to the actual situation --> </dependency>
2.application.yml configuration file
server: port: ${sp.ws} spring: application: name: ${sn.ws} cloud: stream: rocketmq: binder: name-server: ${rocket-mq.name-server} bindings: input: destination: test-topic content-type: application/json group: testSocket
3. Bind the application to the message broker
@EnableBinding(Sink.class)
: This is an annotation for Spring Cloud Stream, which is used to bind applications to message brokers (such as Kafka, RabbitMQ, etc.). Sink.class
is a predefined input channel provided by Spring Cloud Stream that allows you to receive messages. Through this annotation, your application can listen to the message channel and define message processing logic.
@SpringBootApplication @EnableDiscoveryClient @EnableBinding(Sink.class) public class WsApplication { public static void main(String[] args) { SpringApplication.run(WsApplication.class, args); } }
4. Message subscription component
Listen for messages in the message channel. Once a message arrives, the listenNotify
method will be triggered. This method is responsible for processing the message and sending a response through the chat
service.
@Component @Slf4j public class MqListenComponent { @Resource ChatService chat; @StreamListener(Sink.INPUT) public void listenNotify(SampleNotifyDTO notify) { log.info(notify.toString()); chat.sendMessage(notify.getClientId(), notify); } }
5.Message notification service
package com.zeroone.star.ws.service; import cn.hutool.json.JSONUtil; import lombok.SneakyThrows; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @Component @ServerEndpoint("/chat") public class ChatService { /** * Connection session pool */ private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session) throws IOException { // Determine whether the client object exists if (SESSION_POOL.containsKey(session.getQueryString())) { CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID conflict, connection refused"); session.getUserProperties().put("reason", closeReason); session.close(); return; } // Store the client object into the session pool SESSION_POOL.put(session.getQueryString(), session); System.out.println("Client (" + session.getQueryString() + "): Connection opened"); } @OnMessage public String onMessage(String msg, Session session) throws IOException { // Parse the message ==> ID::Message content String[] msgArr = msg.split("::", 2); // Process group messages, ID==all means group messages if ("all".equalsIgnoreCase(msgArr[0])) { for (Session one : SESSION_POOL.values()) { // exclude self if (one == session) { continue; } // Send a message one.getBasicRemote().sendText(msgArr[1]); } } // Specify sending else { // Get the receiver Session target = SESSION_POOL.get(msgArr[0]); if (target != null) { target.getBasicRemote().sendText(msgArr[1]); } } return session.getQueryString() + ":Message sent successfully"; } @OnClose public void onClose(Session session) { //Connection refused close session Object reason = session.getUserProperties().get("reason"); if (reason instanceof CloseReason) { CloseReason creason = (CloseReason) reason; if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) { System.out.println("Reject client (" + session.getQueryString() + "): Close connection"); return; } } //Remove session from session pool SESSION_POOL.remove(session.getQueryString()); System.out.println("Client (" + session.getQueryString() + "): Close connection"); } @OnError public void onError(Session session, Throwable throwable) { System.out.println("Client (" + session.getQueryString() + ") error message: " + throwable.getMessage()); } @SneakyThrows public void sendMessage(String id, Object message) { // Bulk sending if ("all".equalsIgnoreCase(id)) { for (Session one : SESSION_POOL.values()) { // Send a message one.getBasicRemote().sendText(JSONUtil.toJsonStr(message)); } } // Specify sending else { // Get the receiver Session target = SESSION_POOL.get(id); if (target != null) { target.getBasicRemote().sendText(JSONUtil.toJsonStr(message)); } } } }
Project structure:
The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Network Skill TreeHomepageOverview 42058 people are learning the system