spring-cloud-stream

Directory of series articles

Chapter 1 Application of Java Thread Pool Technology
Chapter 2 Application of CountDownLatch and Semaphone
Chapter 3 Introduction to Spring Cloud
Chapter 4 Spring Cloud Netflix-Eureka
Chapter 5 Spring Cloud Netflix Ribbon
Chapter 6 OpenFeign of Spring Cloud
Chapter 7 GateWay of Spring Cloud
Chapter 8 Hystrix of Spring Cloud Netflix
Chapter 9 Code management gitlab use
Chapter 10 Nacos discovery of SpringCloud Alibaba
Chapter 11 Nacos Config of SpringCloud Alibaba
Chapter 12 Spring Cloud Alibaba Sentinel
Chapter 13 JWT
Chapter 14 RabbitMQ Application
Chapter 15 RabbitMQ Delay Queue
Chapter 16 spring-cloud-stream

Article directory

  • Table of Contents of Series Articles
    • @[TOC](Article Directory)
  • Preface
  • 1. Stream design ideas
  • 2. Commonly used annotations for coding
  • 3. Encoding steps
    • 3.1. Add dependencies
    • 3.2. Modify configuration file
    • 3.3. Production
    • 3.4. Consumption
    • 3.5. Delay queue
      • 3.5.1. Modify configuration file
      • 3.5.2. Production side
      • 3.5.2. Message confirmation mechanism consumer side
  • Summarize

Foreword

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
The official definition of Spring Cloud Stream is a framework for building message-driven microservices. Applications interact with binder objects in Spring Cloud Stream through inputs or outputs. Binding is done through our configuration, and the binder object of Spring Cloud Stream is responsible for interacting with the message middleware.

SpringCloud stream uses Spring Integration to connect the message broker middleware to achieve message event driving. Spring Cloud Stream provides personalized automated configuration implementation for some vendors’ message middleware products, citing the three core concepts of publish-subscribe, consumer group, and partition.

Stream allows us to no longer pay attention to the details of specific MQ. We only need to use an adaptive binding method to automatically switch between various MQs. In general, Stream can shield the differences in the underlying message middleware and reduce switching. Cost, is the programming model for Unified Messaging.

1. Stream design ideas


  • Binder: Very convenient connection middleware, shielding differences
  • Channel: Channel is an abstraction of Queue. In the message communication system, it is the medium for storage and forwarding. The queue is configured through Channel.
  • Source and Sink: It can be simply understood that the reference object is Spring Cloud Stream itself, publishing messages from Stream is output, and receiving messages is input.

2. Commonly used annotations for coding

Composition Description
Middleware Middleware, currently only supports RabbitMQ and Kafka
Binder Binder is the encapsulation between the application and the message middleware. Currently, the Binder of Kafka and RabbitMQ is implemented. It is very convenient through BInder The connection middleware can dynamically change the message type (corresponding to Kafka’s topic, RabbitMQ’s exchange), which can be achieved through configuration files.
@Input The annotation identifies the input channel and the message received through this input channel Enter the application
@Output annotation identifies the output channel, and the published message will pass The channel leaves the application
@StreamListener Listening queue for consumers Queue message reception
@EnableBinding refers to the channel channel and exchange being bound together

3. Encoding steps

3.1. Add dependencies

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2. Modify configuration file

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #Service information of rabbitmq that needs to be bound
        defaultRabbit: #defined name, used for bidding integration
          type: rabbit #Message component type
          environment: #Configure rabbimq connection environment
            spring:
              rabbitmq:
                host: localhost #rabbitmq server address
                port: 5672 #rabbitmq server port
                username: tiger #rabbitmq username
                password: tiger #rabbitmq password
                virtual-host: tiger_vh #Virtual path
      bindings: #Service integration processing
        saveOrderOutput: #This is the name of the message channel --->Save the order output channel
          destination: exchange-saveOrder #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ.
          content-type: application/json #Set the message type, this time it is json
          default-binder: defaultRabbit
          group: saveOrderGroup #Group
        saveOrderInput: #Producer binding, this is the name of the message channel ---> Save the order input channel
          destination: exchange-saveOrder #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ.
          content-type: application/json #Set the message type, this time it is json
          default-binder: defaultRabbit
          group: saveOrderGroup #Group
</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack. png" alt="" title="">

3.3, Production

/**
 * Order message output channel processor
 */
@Component
public interface OrderOutputChannelProcesor {<!-- -->
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {<!-- -->
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){<!-- -->
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("Message sent successfully:" + userInfo);
    }
}

3.4, Consumption

/**
 * Order message input channel processor
 */
@Component
public interface OrderInputChannelProcesor {<!-- -->
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {<!-- -->
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){<!-- -->
        log.info("Received message successfully:" + userInfoMessage.getPayload());
    }
}

3.5, Delay Queue

Install the delay queue plugin:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
Download and unzip, go to the plugins directory, and execute the following commands:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1. Modify configuration file

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #Service information of rabbitmq that needs to be bound
        defaultRabbit: #defined name, used for bidding integration
          type: rabbit #Message component type
          environment: #Configure rabbimq connection environment
            spring:
              rabbitmq:
                host: localhost #rabbitmq server address
                port: 5672 #rabbitmq server port
                username: tiger #rabbitmq username
                password: tiger #rabbitmq password
                virtual-host: tiger_vh #Virtual path
      bindings: #Service integration processing
        saveOrderOutput: #This is the name of the message channel --->Save the order output channel
          destination: exchange-saveOrder-delay #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ.
          content-type: application/json #Set the message type, this time it is json
          default-binder: defaultRabbit
          group: saveOrderGroup #Group
        saveOrderInput: #Producer binding, this is the name of the message channel ---> Save the order input channel
          destination: exchange-saveOrder-delay #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ.
          content-type: application/json #Set the message type, this time it is json
          default-binder: defaultRabbit
          group: saveOrderGroup #Group
      rabbit:
        bindings: #Service integration processing
          saveOrderOutput: #This is the name of the message channel --->Save the order output channel
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true
</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack. png" alt="" title="">

3.5.2, Production end

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {<!-- -->
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){<!-- -->
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("Message sent successfully:" + userInfo);
    }
}

3.5.2. Message confirmation mechanism consumer side

rabbit:
  bindings: #Service integration processing
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #Manual confirmation
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){<!-- -->

    log.info("Received message successfully:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag: The unique identifier of the Channel's message delivery.
     * multiple: Whether to negatively respond to multiple messages. If set to true, the message with the specified deliveryTag and multiple messages before the deliveryTag will be negatively responded;
     * If set to false, only a single message with the specified deliveryTag will be negatively acknowledged.
     * requeue: Whether the message with negative response is re-entered into the queue. If set to true, the message is re-queued;
     * If set to false, messages are discarded or sent to Dead Letter Exchange.
     */
    try {<!-- -->
        channel.basicAck(deliverTag,true);
    } catch (IOException e) {<!-- -->
        e.printStackTrace();
    }
}
</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack. png" alt="" title="">

Define the switch type as direct

rabbit:
  bindings: #Service integration processing
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: orderRoutingKey
        exchangeType: direct

Summary

spring-cloud-stream currently supports RabbitMQ and Kafka, and is seamlessly integrated with spring-cloud, which is very convenient.