SpringCloudStream+RocketMQ transaction message configuration

The version used in this article

spring-cloud-stream 3.2.6

rocketmq-client 4.9.4

spring-cloud-starter-stream-rocketmq 2021.0.5.0

1. Dependency import

 <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

This version no longer requires the introduction of rocketmq-spring-boot-starter dependency

This version of stream does not support the @EnableBinding annotation, and this version of rocketmq does not support the txProducerGroup parameter.

2. Writing producers

1. Write configuration

Add the following configuration to application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          addBounsChannel-out-0:
            producer:
              producerType: Trans
              transactionListener: addBounsStreamTransactionListener
      bindings:
        ## New version fixed format channel name-{out/in}-{index}
        addBounsChannel-out-0:
          destination: add-boons
          group: bouns-producer-group

For the configuration of this transaction, please refer to the official documentation: https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc

Note: The channel under bindings can only have one in and one out. Multiple ins and multiple outs cannot be configured, otherwise it will cause configuration confusion.

Note: There are changes in transaction configuration between old and new versions.

The old version is

spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true

The new version is

spring.cloud.stream.rocketmq.bindings.output2.producer.producerType=Trans

If you are not sure about the version, you can directly view the properties of the class below. com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties

2. Write code

Send code

 @Autowired
    private StreamBridge streamBridge;

            String transactionId = UUID.randomUUID().toString();

            streamBridge.send("addBounsChannel-out-0",
                    MessageBuilder.withPayload(
                            UserAddBonusMsgDTO.builder()
                                    .userId(share.getUserId())
                                    .bonus(50)
                                    .build()
                    )
                            .setHeader("TRANSACTION_ID", transactionId)
                            .setHeader("share_id", id)
                            .setHeader("dto", JSON.toJSONString(auditDTO))
                            .build()

The first parameter of send is consistent with the channel name in yml

transaction code

package com.itmuch.contentcenter.rocketmq;

import com.alibaba.fastjson.JSON;
import com.itmuch.contentcenter.dao.content.RocketmqTransactionLogMapper;
import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
import com.itmuch.contentcenter.domain.entity.content.RocketmqTransactionLog;
import com.itmuch.contentcenter.service.content.ShareService;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;

@Component
public class AddBounsStreamTransactionListener implements TransactionListener {

    @Autowired
    private ShareService shareService;

    @Resource
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Map<String, String> headers = msg.getProperties();
        String transactionId = (String) headers.get("TRANSACTION_ID");
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));
        ShareAuditDTO auditDTO = JSON.parseObject(headers.get("dto"), ShareAuditDTO.class);
        try {
            shareService.auditByIdInDBWithRocketMqLog(shareId, auditDTO, transactionId);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Map<String, String> headers = msg.getProperties();
        String transactionId = (String) headers.get("TRANSACTION_ID");
        RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );
        if (rocketmqTransactionLog != null) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

The transaction code here is similar to the non-stream method of implementing the two methods in RocketMQLocalTransactionListener.

The differences are:

To obtain the header, the non-stream method is to call the getHeaders() method, and the stream method is to call the getProperties() method.

The constant RocketMQHeaders.TRANSACTION_ID is no longer available in the stream mode. Just use the string “TRANSACTION_ID” to replace it.

3. Writing consumers

1. Write configuration

application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        ## New version fixed format channel name-{out/in}-{index}
        addBounsChannel-in-0:
          destination: add-boons
          group: bouns-consumer-group

Note: The channel under bindings can only have one in and one out. Multiple ins and multiple outs cannot be configured, otherwise it will cause configuration confusion.

2. Write code

consumer code

package com.itmuch.usercenter.rocketmq;

import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.itmuch.usercenter.service.user.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Consumer;

@Slf4j
@Configuration
public class AddBounsStreamConsumer {

    @Autowired
    private UserService userService;

    @Bean
    public Consumer<UserAddBonusMsgDTO> addBounsChannel() {
        return message -> {
            log.info("addBounsChannel received message: {}", message);
            userService.addBonus(message);
        };
    }
}

Note: The method name of the @Bean annotation is consistent with the first half of the channel name in the yml.

Referenced by userService.addBonus

package com.itmuch.usercenter.service.user;

import com.itmuch.usercenter.dao.user.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.user.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class UserService {
    @Resource
    private UserMapper userMapper;

    @Resource
    private BonusEventLogMapper bonusEventLogMapper;


    @Transactional(rollbackFor = Exception.class)
    public void addBonus(UserAddBonusMsgDTO message) {
        log.info("Consumption message message ={}",message);

        //When receiving the message, perform the business
        //1. Add points to users
        Integer userId = message.getUserId();
        User user = userMapper.selectByPrimaryKey(userId);
        Integer bonus = message.getBonus();
        user.setBouns(user.getBouns() + bonus);
        userMapper.updateByPrimaryKeySelective(user);

        //2. Record the log into the bounus_event_log table
        bonusEventLogMapper.insert(
                BonusEventLog.builder()
                        .userId(userId)
                        .value(bonus)
                        .event("CONTRIBUTE")
                        .createTime(new Date())
                        .description("Submit and get points..")
                        .build()
        );
        log.info("Points added..");
    }
}

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 15941 people are learning the system