SpringCloudStream simply integrates RocketMQ and transaction messages

Spring Cloud Stream integrated messaging system

Introduction

? Spring Cloud Stream is a framework that unifies the message middleware programming model, shields the differences in the underlying message middleware, reduces learning costs and switching costs, and its core is to further encapsulate the message middleware. The official definition of Spring Cloud Stream is a framework for building message-based microservice applications.

? The Binder object concept of Spring Cloud Stream is very important, and the implementation of Binder is different for different message middleware products. For example, the implementation of Kafka is KafkaMessageChannelBinder, the implementation of RabbitMQ is RabbitMessageChannelBinder, and the implementation of RocketMQ is RocketMQMessageChannelBinder. (From the official website https://spring.io/projects/spring-cloud-stream), currently supported message middleware products:

There is another important concept, Binding, which is divided into Input Binding and Output Binding. Binding is used to bind message producers and message consumers to build a communication bridge. The underlying layer uses Binder objects to interact with message middleware.

So why use Spring Cloud Stream?

  • Decoupling business code and message middleware: You don’t need to pay attention to which message middleware it is (it depends on whether Spring Cloud Stream supports it), you only need to comply with the programming specifications of Spring Cloud Stream. If you change the message middleware, you only need to modify the relevant configuration in the configuration file, and Binder will automatically switch for you.
  • Low learning cost: integrate a Spring Cloud Stream, and other middleware can be quickly integrated and used even if you have not learned it before

Commonly used annotations

  • @Input: Marked as input channel, consume messages
  • @Output: Marked as output channel, production message
  • @StreamListener: Listen to a queue and receive messages
  • @EnableBinding: binding channel

Integrate RocketMQ

Since you want to use Spring Cloud Stream, you need to choose a message middleware, here RocketMQ is used.

RocketMQ was formerly known as Metaq. When Metaq released version 3.0, it was renamed RocketMQ. It has experienced the test of Taobao Double Eleven traffic and is trustworthy. RocketMQ is a distributed messaging middleware with the following advantages:

  • High performance, high reliability, high real-time
  • Ensure strict message ordering
  • Support message pull mode
  • support affairs
  • Billion-level message accumulation capability
  • Efficient subscriber horizontal scalability
  • Can be deployed in clusters

Download and install

Or download according to the corresponding version. The version of spring cloud alibaba used by the author is 2021.0.4.0, then select 4.9.4 for the RocketMQ version. That is Can

The download address is posted here https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

The download speed is too slow. Here are the network disk resources:

Link: https://pan.baidu.com/s/1cwpVD2to-vkyNMpJ25kEbg
Extraction code: 9609

Install

After the download is complete, unzip it. If it is a Windows system, set the environment variables.

When starting rocketmq under windows, just enter the bin directory and click mqnamesrv.cmd and mqbroker.cmd to start the Name Server and Broker services respectively.

If it is a Linux system, as long as there is a JDK environment, no additional configuration is required. It can be run directly in the RocketMQ directory.

nohup sh bin/mqnamesrv & amp;
nohup sh bin/mqbroker -n localhost:9876 & amp;

Starting RocketMQ requires a larger memory, such as a Linux virtual machine with 512MB of memory, which generally cannot be started. You can modify the JAVA_OPT parameters of the bin/runbroker.sh file; if the windows system cannot be started, go to bin/runbroker.cmd to modify the JAVA_OPT parameters.

Spring Cloud Stream + RocketMQ producer code

  • Add RocketMQ dependency and health monitoring dependency in the pom.xml file
<!-- Service registration needs to be introduced for service discovery -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

<!--Health Monitoring-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!--rocketmq-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

<!--binder dependencies-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
    <version>0.9.0.RELEASE</version>
</dependency>
  • Declare the Source binding channel, which is actually the message output.
public interface CustomSource {<!-- -->
    @Output("output1")
    MessageChannel output1();
}

In the startup class or other related configuration classes to bind the channel, add the @EnableBinding annotation

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(CustomSource.class)
public class StreamProduceApplication {<!-- -->

    public static void main(String[] args) {<!-- -->
        SpringApplication.run(StreamProduceApplication.class, args);
    }
}
  • Add the configuration of the RocketMQ service and output1 output channel and expose the Spring Cloud Stream monitoring endpoint in the yml configuration file
server:
  port: 8081
spring:
  application:
    name: produce # application name

  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # nacos service address
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876 # rocketmq service address
group: group
      bindings:
        output1:
          destination: test-topic #Message topic
          content-type: application/json # Data type

management:
  endpoints:
    web:
      exposure:
        include: '*' # Expose all endpoints, SpringCloudStream's endpoints: /actuator/bindings, /actuator/channels, /actuator/health

  endpoint:
    health:
      show-details: always # Show service information details
  • Create SendMessageService class to produce messages
@Service
public class SendMessageService {<!-- -->
    
    @Resource
    private CustomSource customSource;
    public String sendMessage() {<!-- -->
        String payload = "Send a simple string test" + RandomUtils.nextInt(0, 500);
        customSource.output1().send(MessageBuilder.withPayload(payload).build()); // Send message
        return payload;
    }
}
  • Write an interface to send messages
@RestController
public class TestController {<!-- -->
    
    @Resource
    SendMessageService messageService;
    
    @RequestMapping("/sendMessage")
    public String sendMessage() {<!-- -->
        return messageService.sendMessage();
    }
    
}

Spring Cloud Stream + RocketMQ consumer

  • In the same way, first import RocketMQ’s dependencies, which I won’t go into details here.
  • Custom Sink is the input of the message
public interface CustomSink {<!-- -->

    @Input("input1")
    SubscribableChannel input1();
}

And bind it in the startup class

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(CustomSink.class)
public class StreamConsumerApplication {<!-- -->

    public static void main(String[] args) {<!-- -->
        SpringApplication.run(StreamConsumerApplication.class, args);
    }
}
  • In the yml configuration file
server:
  port: 8082
spring:
  application:
    name: consumer # application name

  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # nacos service address
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876 # rocketmq service address
group: group
      bindings:
        input1:
          destination: test-topic #Message topic
          content-type: application/json # Data type
          group: test-group # Consumer group to prevent repeated consumption by multiple instances. Only one instance of the same group can receive it.

management:
  endpoints:
    web:
      exposure:
        include: '*' # Expose all endpoints, SpringCloudStream's endpoints: /actuator/bindings, /actuator/channels, /actuator/health

  endpoint:
    health:
      show-details: always # Show service information details
  • Define message listeners and process message data. Two types are defined here, and you can choose different receiving methods.
@Component
public class ConsumerListener {<!-- -->
    //Receive and process messages, receive strings
    @StreamListener("input1")
    public void input1Consumer(String message) {<!-- -->
        System.out.println("input1Consumer received " + message);
    }
    // Receive and process messages, receive the most original Message
    @StreamListener("input1")
    public void input1ConsumerMessage(Message<String> message) {<!-- -->
        String payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();

        System.out.println("input1Consumer Message - message content " + payload + " message header " + headers );
    }
}

Both input1Consumer() and input1ConsumerMessage() can consume information. The difference is that the data they receive is different.

input1ConsumerMessage() can get more information, such as header information, etc.

Verify message production and consumption

Start the producer and consumer, and test the interface with cmd

curl localhost:8081/sendMessage
Send simple string test 13

I saw the consumer log and got the news.

Send object message

The producer creates an object class

public class User {<!-- -->
    
    private Integer id;
    private String name;

    public Integer getId() {<!-- -->
        return id;
    }

    public void setId(Integer id) {<!-- -->
        this.id = id;
    }

    public String getName() {<!-- -->
        return name;
    }

    public void setName(String name) {<!-- -->
        this.name = name;
    }
}

SendMessageService class adds sendObjectMessage method

public String sendObjectMessage() {<!-- -->
    User user = new User();
    user.setId(RandomUtils.nextInt(0, 500));
    user.setName("ZhangSan");
    customSource.output1().send(MessageBuilder.withPayload(user).build()); // Send message
    return "User id" + user.getId() + "User name:" + user.getName();
}
  • Added interface /sendObjectMessage
@RequestMapping("/sendObjectMessage")
public String sendObjectMessage() {<!-- -->
    return messageService.sendObjectMessage();
}
  • Under test
curl localhost:8081/sendObjectMessage
User id221 Username: ZhangSan

log

Repeated consumption problem

If repeated consumption problems occur, it is generally caused by the following two situations:

  • There are two instances of consumers, but their spring.cloud.stream.bindings..group groups are different, then no one of them will consume once, resulting in multiple consumptions, Solution: Place them in the same group
  • After the consumer receives the message, an exception is thrown during business processing. There is a retry mechanism, which will cause multiple repeated consumptions. Exception capture records can be saved for subsequent business processing or subsequent manual compensation of data to achieve final data consistency

Message filtering

There are two options for message filtering

  • @StreamListener annotates the condition attribute
  • Set tags filtering

@StreamListener annotates the condition attribute

In the message producer SendMessageService class, add the sendConsitionMessage method to send messages with custom header Custom-header

public String sendConditionMessage() {<!-- -->
    String payload = "Send request header string test" + RandomUtils.nextInt(0, 500);
    customSource.output1().send(MessageBuilder
                                .withPayload(payload)
                                .setHeader("custom-header", "customHeader") // Set header information
                                .build()); // Send message
    return payload;
}
  • Add interface
@RequestMapping("/sendConditionMessage")
public String sendConditionMessage() {<!-- -->
    return messageService.sendConditionMessage();
}
  • When the consumer listens to the message, the filter condition (I only leave one listening method here), then this method only listens, the header information exists custom-header and the value is the information of customHeader
@StreamListener(value = "input1", condition = "headers['custom-header']=='customHeader'")
public void input1Consumer(String message) {<!-- -->
    System.out.println("input1Consumer received " + message);
}
  • test
curl localhost:8081/sendConditionMessage
Send request header string test 344

Message filtering – tags

Producer SendMessageService adds sendTagsMessage method

public String sendTagsMessage() {<!-- -->
    String payload = "Send test with tags" + RandomUtils.nextInt(0, 500);
    customSource.output1().send(MessageBuilder
                                .withPayload(payload)
                                .setHeader(RocketMQConst.Headers.TAGS, "test") //Set header information
                                .build()); // Send message
    return payload;
}
  • Add interface /sendTagsMessage
@RequestMapping("/sendTagsMessage")
public String sendTagsMessage() {<!-- -->
    return messageService.sendTagsMessage();
}
  • Filtering in consumer yml configuration class
server:
  port: 8082
spring:
  application:
    name: consumer # application name

  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # nacos service address
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876 # rocketmq service address
          group: group
        bindings:
          input1:
            consumer:
              tags: test # Specify input1 to consume messages with tags as test. If multiple messages are separated by ||, such as test1 || test2, it means consuming test1 or test2.
      bindings:
        input1:
          destination: test-topic #Message topic
          content-type: application/json # Data type
          group: test-group # Consumer group to prevent repeated consumption by multiple instances. Only one instance of the same group can receive it.
          
management:
  endpoints:
    web:
      exposure:
        include: '*' # Expose all endpoints, SpringCloudStream's endpoints: /actuator/bindings, /actuator/channels, /actuator/health

  endpoint:
    health:
      show-details: always # Show service information details
  • test
curl localhost:8081/sendTagsMessage
Send test with tags 323

Exception handling

Message-unified exception handling is divided into local exception handling and global exception handling for a certain topic.

local

On the consumer side, create an exception handling class

@Component
public class HandleConsumerError {<!-- -->

    // Handle exceptions in test-group under test-topic
    @ServiceActivator(inputChannel = "test-topic.test-group.errors")
    public void handleError(ErrorMessage message) {<!-- -->
        Throwable payload = message.getPayload();
        System.out.println("Exception intercepted: " + payload.getMessage());

        System.out.println("Original message:" + new String((byte[])
                ((MessagingException) payload).getFailedMessage().getPayload()));
    }

}
  • Consumer error reporting
@StreamListener("input1")
public void input1Consumer(String message) {<!-- -->
    System.out.println("input1Consumer received " + message);
    int i = 1/0;
}
  • Under test

It can be seen from the log that it consumes three times and then catches the exception, so there is a two-retry mechanism. After retrying, if an error is still reported, this exception will be caught

How to configure it globally?

@StreamListener("errorChannel")
public void errorChannel(ErrorMessage message) {<!-- -->
    Throwable payload = message.getPayload();
    System.out.println("Exception intercepted: " + payload.getMessage());

    System.out.println("Original message:" + new String((byte[])
                                            ((MessagingException) payload).getFailedMessage().getPayload()));
}

Globally, you need to listen to the channel of errorChannel.

Then you need to pay attention: when both local and global configurations are configured, the local one is taken first. After the local capture, the global one will not be taken again

Transaction message

Producer

In the yml configuration file in the producer, add the output channel of transaction messages, output2, transaction subscription topic and transaction grouping, etc.

server:
  port: 8081
spring:
  application:
    name: produce # application name

  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # nacos service address
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876 # rocketmq service address
          group: group
      bindings:
        output1:
          destination: test-topic #Message topic
          content-type: application/json # Data type
        output2:
          destination: transaction-topic #Message topic
          content-type: application/json # Data type

management:
  endpoints:
    web:
      exposure:
        include: '*' # Expose all endpoints, SpringCloudStream's endpoints: /actuator/bindings, /actuator/channels, /actuator/health

  endpoint:
    health:
      show-details: always # Show service information details
  • CustomSource adds channel output
@Output("output2")
MessageChannel output2();
  • Producer adds transaction method for sending messages
@Autowired
private RocketMQTemplate rocketMQTemplate;
public String sendTransactionalMessage() {<!-- -->
    String uuid = UUID.randomUUID().toString();
    String payload = "Send transaction test message" + uuid;
    Message<String> build = MessageBuilder
        .withPayload(payload)
        .setHeader(RocketMQHeaders.TRANSACTION_ID, uuid) //Set header information
        .build();
    rocketMQTemplate.sendMessageInTransaction("myTxProducerGroup", "transaction-topic", build, uuid);
    return payload;
}
  • Add interface /sendTransactionalMessage
@RequestMapping("/sendTransactionalMessage")
public String sendTransactionalMessage() {<!-- -->
    return messageService.sendTransactionalMessage();
}
  • Important steps: The producer adds the TransactionalListener class to perform local transactions and check local transactions
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10) // Transaction grouping configured in the configuration file
public class TransactionalListener implements RocketMQLocalTransactionListener {<!-- -->
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- --> // Execute local transaction
        try {<!-- -->
            String transactionalId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

            System.out.println("executeLocalTransaction transactionalId:" + transactionalId + " date = " + new Date());

            //Do business processing logic here
            // Return RocketMQLocalTransactionState.COMMIT on success
            // Return RocketMQLocalTransactionState.ROLLBACK on failure
            
            // submit
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {<!-- -->
            // Rollback on error
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {<!-- --> // Check local transactions
        // After a certain period of time, if there are still messages sent for confirmation, RocketMQ will actively call the sender and let the caller decide whether the message should be sent. This method determines whether the message should be submitted or rolled back.
        String transactionalId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

        System.out.println("checkLocalTransaction transactionalId:" + transactionalId + " date = " + new Date());
        
        // todo Here you can check whether the database is successfully entered into the database, and return commit if successful, otherwise rollback
        
        // submit
        return RocketMQLocalTransactionState.COMMIT;
    }
}

executeLocalTransaction() method: After sending the preliminary message, first execute the local transaction in this method. If successful, commit the transaction, otherwise roll back the transaction. Specific steps:

  • Call the service layer to execute the operation logic of the database
  • Determine whether the data is successfully stored in the database, and return submission (RocketMQLocalTransactionState.COMMIT) if successful, and rollback (RocketMQLocalTransactionState.ROLLBACK) if failed.

checkLocalTransaction() method: If there are still unconfirmed messages after a certain period of time, RocketMQ will actively call the sender, and finally call the checkLocalTransaction() method to let the caller decide whether to submit or rollback the message. Specific steps:

  • Query the database through transactionId or other related attributes to see whether executeLocalTransaction is successful.
  • Submit successfully, rollback if failed

Consumer

In the consumer yml configuration

server:
  port: 8082
spring:
  application:
    name: consumer # application name

  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # nacos service address
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876 # rocketmq service address
          group: group
        bindings:
          input1:
            consumer:
              tags: test # Specify input1 to consume messages with tags as test. If multiple messages are separated by ||, such as test1 || test2, it means consuming test1 or test2.
      bindings:
        input1:
          destination: test-topic #Message topic
          content-type: application/json # Data type
          group: test-group # Consumer group to prevent repeated consumption by multiple instances. Only one instance of the same group can receive it.
        input2:
          destination: transaction-topic #Message topic
          content-type: text/plain # Data type
          group: transaction-group # Consumer group to prevent repeated consumption by multiple instances. Only one instance of the same group can receive it.


management:
  endpoints:
    web:
      exposure:
        include: '*' # Expose all endpoints, SpringCloudStream's endpoints: /actuator/bindings, /actuator/channels, /actuator/health

  endpoint:
    health:
      show-details: always # Show service information details
  • Consumer CustomSink adds channel input2
@Input("input2")
SubscribableChannel input2();
  • The ConsumerListener class adds a method for consuming transaction messages receiveTransactionalMsg
@StreamListener("input2")
public void receiveTransactionalMsg(String message) {<!-- -->
    try {<!-- -->
        System.out.println("receiveTransactionalMsg received " + message);
    } catch (Exception e) {<!-- -->
        // When handling error reports, message data can be recorded and manual intervention can be used to achieve data consistency.
        // Or when you want to roll back the upstream transaction, feed this message data back to the upstream, and the upstream deletes this data.
    }

}
  • test
curl localhost:8081/sendTransactionalMessage
Send transaction test message 67d2785d-9f07-4489-84ad-516bd1764633

producer log

When executing a local transaction, if an abnormal rollback occurs, the consumer will not receive this message, and the upstream (producer) can roll back its own data.