Directory of series articles
Chapter 1 Application of Java Thread Pool Technology
Chapter 2 Application of CountDownLatch and Semaphone
Chapter 3 Introduction to Spring Cloud
Chapter 4 Spring Cloud Netflix-Eureka
Chapter 5 Spring Cloud Netflix Ribbon
Chapter 6 OpenFeign of Spring Cloud
Chapter 7 GateWay of Spring Cloud
Chapter 8 Hystrix of Spring Cloud Netflix
Chapter 9 Code management gitlab use
Chapter 10 Nacos discovery of SpringCloud Alibaba
Chapter 11 Nacos Config of SpringCloud Alibaba
Chapter 12 Spring Cloud Alibaba Sentinel
Chapter 13 JWT
Chapter 14 RabbitMQ Application
Chapter 15 RabbitMQ Delay Queue
Chapter 16 spring-cloud-stream
Article directory
- Table of Contents of Series Articles
-
- @[TOC](Article Directory)
- Preface
- 1. Stream design ideas
- 2. Commonly used annotations for coding
- 3. Encoding steps
-
- 3.1. Add dependencies
- 3.2. Modify configuration file
- 3.3. Production
- 3.4. Consumption
- 3.5. Delay queue
-
- 3.5.1. Modify configuration file
- 3.5.2. Production side
- 3.5.2. Message confirmation mechanism consumer side
- Summarize
- @[TOC](Article Directory)
- 3.1. Add dependencies
- 3.2. Modify configuration file
- 3.3. Production
- 3.4. Consumption
- 3.5. Delay queue
-
- 3.5.1. Modify configuration file
- 3.5.2. Production side
- 3.5.2. Message confirmation mechanism consumer side
Foreword
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
The official definition of Spring Cloud Stream is a framework for building message-driven microservices. Applications interact with binder objects in Spring Cloud Stream through inputs or outputs. Binding is done through our configuration, and the binder object of Spring Cloud Stream is responsible for interacting with the message middleware.
SpringCloud stream uses Spring Integration to connect the message broker middleware to achieve message event driving. Spring Cloud Stream provides personalized automated configuration implementation for some vendors’ message middleware products, citing the three core concepts of publish-subscribe, consumer group, and partition.
Stream allows us to no longer pay attention to the details of specific MQ. We only need to use an adaptive binding method to automatically switch between various MQs. In general, Stream can shield the differences in the underlying message middleware and reduce switching. Cost, is the programming model for Unified Messaging.
1. Stream design ideas
- Binder: Very convenient connection middleware, shielding differences
- Channel: Channel is an abstraction of Queue. In the message communication system, it is the medium for storage and forwarding. The queue is configured through Channel.
- Source and Sink: It can be simply understood that the reference object is Spring Cloud Stream itself, publishing messages from Stream is output, and receiving messages is input.
2. Commonly used annotations for coding
Composition | Description |
---|---|
Middleware | Middleware, currently only supports RabbitMQ and Kafka |
Binder | Binder is the encapsulation between the application and the message middleware. Currently, the Binder of Kafka and RabbitMQ is implemented. It is very convenient through BInder The connection middleware can dynamically change the message type (corresponding to Kafka’s topic, RabbitMQ’s exchange), which can be achieved through configuration files. |
@Input | The annotation identifies the input channel and the message received through this input channel Enter the application |
@Output | annotation identifies the output channel, and the published message will pass The channel leaves the application |
@StreamListener | Listening queue for consumers Queue message reception |
@EnableBinding | refers to the channel channel and exchange being bound together |
3. Encoding steps
3.1. Add dependencies
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
3.2. Modify configuration file
server: port: 8088 spring: cloud: stream: binders: #Service information of rabbitmq that needs to be bound defaultRabbit: #defined name, used for bidding integration type: rabbit #Message component type environment: #Configure rabbimq connection environment spring: rabbitmq: host: localhost #rabbitmq server address port: 5672 #rabbitmq server port username: tiger #rabbitmq username password: tiger #rabbitmq password virtual-host: tiger_vh #Virtual path bindings: #Service integration processing saveOrderOutput: #This is the name of the message channel --->Save the order output channel destination: exchange-saveOrder #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ. content-type: application/json #Set the message type, this time it is json default-binder: defaultRabbit group: saveOrderGroup #Group saveOrderInput: #Producer binding, this is the name of the message channel ---> Save the order input channel destination: exchange-saveOrder #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ. content-type: application/json #Set the message type, this time it is json default-binder: defaultRabbit group: saveOrderGroup #Group </code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack. png" alt="" title="">
3.3, Production
/** * Order message output channel processor */ @Component public interface OrderOutputChannelProcesor {<!-- --> @Output("saveOrderOutput") MessageChannel saveOrderOutput(); }
@Slf4j @EnableBinding(OrderOutputChannelProcesor.class) public class OrderMessageProducer {<!-- --> @Autowired @Output("saveOrderOutput") private MessageChannel messageChannel; public void sentMsg(UserInfo userInfo){<!-- --> messageChannel.send(MessageBuilder.withPayload(userInfo).build()); log.info("Message sent successfully:" + userInfo); } }
3.4, Consumption
/** * Order message input channel processor */ @Component public interface OrderInputChannelProcesor {<!-- --> @Input("saveOrderInput") SubscribableChannel saveOrderInput(); }
@Slf4j @EnableBinding(OrderInputChannelProcesor.class) public class OrderMessageConsumer {<!-- --> @StreamListener("saveOrderInput") public void receiveMsg(Message<UserInfo> userInfoMessage){<!-- --> log.info("Received message successfully:" + userInfoMessage.getPayload()); } }
3.5, Delay Queue
Install the delay queue plugin:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
Download and unzip, go to the plugins directory, and execute the following commands:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.5.1. Modify configuration file
server: port: 8088 spring: cloud: stream: binders: #Service information of rabbitmq that needs to be bound defaultRabbit: #defined name, used for bidding integration type: rabbit #Message component type environment: #Configure rabbimq connection environment spring: rabbitmq: host: localhost #rabbitmq server address port: 5672 #rabbitmq server port username: tiger #rabbitmq username password: tiger #rabbitmq password virtual-host: tiger_vh #Virtual path bindings: #Service integration processing saveOrderOutput: #This is the name of the message channel --->Save the order output channel destination: exchange-saveOrder-delay #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ. content-type: application/json #Set the message type, this time it is json default-binder: defaultRabbit group: saveOrderGroup #Group saveOrderInput: #Producer binding, this is the name of the message channel ---> Save the order input channel destination: exchange-saveOrder-delay #exchange name, the default exchange mode is topic; bind the message output channel of SpringCloud stream to the exchange-saveOrder exchanger of RabbitMQ. content-type: application/json #Set the message type, this time it is json default-binder: defaultRabbit group: saveOrderGroup #Group rabbit: bindings: #Service integration processing saveOrderOutput: #This is the name of the message channel --->Save the order output channel producer: delayed-exchange: true saveOrderInput: consumer: delayed-exchange: true </code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack. png" alt="" title="">
3.5.2, Production end
@Slf4j @EnableBinding(OrderOutputChannelProcesor.class) public class OrderMessageProducer {<!-- --> @Autowired @Output("saveOrderOutput") private MessageChannel messageChannel; public void sentMsg(UserInfo userInfo){<!-- --> messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build()); log.info("Message sent successfully:" + userInfo); } }
3.5.2. Message confirmation mechanism consumer side
rabbit: bindings: #Service integration processing saveOrderInput: consumer: acknowledge-mode: MANUAL #Manual confirmation
@StreamListener("saveOrderInput") public void receiveMsg(Message<UserInfo> userInfoMessage){<!-- --> log.info("Received message successfully:" + userInfoMessage.getPayload()); Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL); Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG); /* * deliveryTag: The unique identifier of the Channel's message delivery. * multiple: Whether to negatively respond to multiple messages. If set to true, the message with the specified deliveryTag and multiple messages before the deliveryTag will be negatively responded; * If set to false, only a single message with the specified deliveryTag will be negatively acknowledged. * requeue: Whether the message with negative response is re-entered into the queue. If set to true, the message is re-queued; * If set to false, messages are discarded or sent to Dead Letter Exchange. */ try {<!-- --> channel.basicAck(deliverTag,true); } catch (IOException e) {<!-- --> e.printStackTrace(); } } </code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack. png" alt="" title="">
Define the switch type as direct
rabbit: bindings: #Service integration processing saveOrderInput: consumer: bindingRoutingKey: orderRoutingKey bindQueue: true exchangeType: direct saveOrderOutput: producer: routingKeyExpression: orderRoutingKey exchangeType: direct
Summary
spring-cloud-stream currently supports RabbitMQ and Kafka, and is seamlessly integrated with spring-cloud, which is very convenient.