Sprint Cloud Stream integrates RocketMq and websocket to implement message publishing and subscription

1.Introduce RocketMQ dependencies: First, add RocketMQ dependencies in the pom.xml file:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- The version number is adjusted according to the actual situation -->
</dependency>

2.Configure RocketMQ connection information: Configure RocketMQ connection information in application.properties or application.yml, including Name Server address, etc.:

spring:
  application:
    name: ${sn.publish}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
        bindings:
          output:
            producer:
              group: testSocket
              sync: true
      bindings:
        output:
          destination: test-topic
          content-type: application/json

3.Message publishing component

@Component
public class MqSourceComponent {
    @Resource
    Source source;

    public void publishNotify(SampleNotifyDTO notify) {
        source.output().send(MessageBuilder.withPayload(notify).build());
    }
}

4.Message publishing controller

@RestController
@Api(tags = "rocketmq")
public class MqController {
    @Resource
    MqSourceComponent mq;

    @ApiOperation(value = "Test publishing message")
    @PostMapping("test-publish")
    public JsonVO<String> testSend(SampleNotifyDTO notify) {
        mq.publishNotify(notify);
        return JsonVO.success("Message has been sent");
    }
}

Project structure:

Next is the construction of the websocket module

1. Add dependencies

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- The version number is adjusted according to the actual situation -->
</dependency>

2.application.yml configuration file

server:
  port: ${sp.ws}
spring:
  application:
    name: ${sn.ws}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
      bindings:
        input:
          destination: test-topic
          content-type: application/json
          group: testSocket

3. Bind the application to the message broker

@EnableBinding(Sink.class): This is an annotation for Spring Cloud Stream, which is used to bind applications to message brokers (such as Kafka, RabbitMQ, etc.). Sink.class is a predefined input channel provided by Spring Cloud Stream that allows you to receive messages. Through this annotation, your application can listen to the message channel and define message processing logic.

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {

    public static void main(String[] args) {
        SpringApplication.run(WsApplication.class, args);
    }

}

4. Message subscription component

Listen for messages in the message channel. Once a message arrives, the listenNotify method will be triggered. This method is responsible for processing the message and sending a response through the chat service.

@Component
@Slf4j
public class MqListenComponent {
    @Resource
    ChatService chat;

    @StreamListener(Sink.INPUT)
    public void listenNotify(SampleNotifyDTO notify) {
        log.info(notify.toString());
        chat.sendMessage(notify.getClientId(), notify);
    }
}

5.Message notification service

package com.zeroone.star.ws.service;

import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


@Component
@ServerEndpoint("/chat")
public class ChatService {
    /**
     * Connection session pool
     */
    private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session) throws IOException {
        // Determine whether the client object exists
        if (SESSION_POOL.containsKey(session.getQueryString())) {
            CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID conflict, connection refused");
            session.getUserProperties().put("reason", closeReason);
            session.close();
            return;
        }
        // Store the client object into the session pool
        SESSION_POOL.put(session.getQueryString(), session);
        System.out.println("Client (" + session.getQueryString() + "): Connection opened");
    }

    @OnMessage
    public String onMessage(String msg, Session session) throws IOException {
        // Parse the message ==> ID::Message content
        String[] msgArr = msg.split("::", 2);
        // Process group messages, ID==all means group messages
        if ("all".equalsIgnoreCase(msgArr[0])) {
            for (Session one : SESSION_POOL.values()) {
                // exclude self
                if (one == session) {
                    continue;
                }
                // Send a message
                one.getBasicRemote().sendText(msgArr[1]);
            }
        }
        // Specify sending
        else {
            // Get the receiver
            Session target = SESSION_POOL.get(msgArr[0]);
            if (target != null) {
                target.getBasicRemote().sendText(msgArr[1]);
            }
        }
        return session.getQueryString() + ":Message sent successfully";
    }

    @OnClose
    public void onClose(Session session) {
        //Connection refused close session
        Object reason = session.getUserProperties().get("reason");
        if (reason instanceof CloseReason) {
            CloseReason creason = (CloseReason) reason;
            if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {
                System.out.println("Reject client (" + session.getQueryString() + "): Close connection");
                return;
            }
        }
        //Remove session from session pool
        SESSION_POOL.remove(session.getQueryString());
        System.out.println("Client (" + session.getQueryString() + "): Close connection");
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        System.out.println("Client (" + session.getQueryString() + ") error message: " + throwable.getMessage());
    }

    @SneakyThrows
    public void sendMessage(String id, Object message) {
        // Bulk sending
        if ("all".equalsIgnoreCase(id)) {
            for (Session one : SESSION_POOL.values()) {
                // Send a message
                one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
        // Specify sending
        else {
            // Get the receiver
            Session target = SESSION_POOL.get(id);
            if (target != null) {
                target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
    }
}

Project structure:

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Network Skill TreeHomepageOverview 42058 people are learning the system