The version used in this article
spring-cloud-stream 3.2.6
rocketmq-client 4.9.4
spring-cloud-starter-stream-rocketmq 2021.0.5.0
1. Dependency import
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
This version no longer requires the introduction of rocketmq-spring-boot-starter dependency
This version of stream does not support the @EnableBinding annotation, and this version of rocketmq does not support the txProducerGroup parameter.
2. Writing producers
1. Write configuration
Add the following configuration to application.yml
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: addBounsChannel-out-0: producer: producerType: Trans transactionListener: addBounsStreamTransactionListener bindings: ## New version fixed format channel name-{out/in}-{index} addBounsChannel-out-0: destination: add-boons group: bouns-producer-group
For the configuration of this transaction, please refer to the official documentation: https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc
Note: The channel under bindings can only have one in and one out. Multiple ins and multiple outs cannot be configured, otherwise it will cause configuration confusion.
Note: There are changes in transaction configuration between old and new versions.
The old version is
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
The new version is
spring.cloud.stream.rocketmq.bindings.output2.producer.producerType=Trans
If you are not sure about the version, you can directly view the properties of the class below. com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties
2. Write code
Send code
@Autowired private StreamBridge streamBridge; String transactionId = UUID.randomUUID().toString(); streamBridge.send("addBounsChannel-out-0", MessageBuilder.withPayload( UserAddBonusMsgDTO.builder() .userId(share.getUserId()) .bonus(50) .build() ) .setHeader("TRANSACTION_ID", transactionId) .setHeader("share_id", id) .setHeader("dto", JSON.toJSONString(auditDTO)) .build()
The first parameter of send is consistent with the channel name in yml
transaction code
package com.itmuch.contentcenter.rocketmq; import com.alibaba.fastjson.JSON; import com.itmuch.contentcenter.dao.content.RocketmqTransactionLogMapper; import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO; import com.itmuch.contentcenter.domain.entity.content.RocketmqTransactionLog; import com.itmuch.contentcenter.service.content.ShareService; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; @Component public class AddBounsStreamTransactionListener implements TransactionListener { @Autowired private ShareService shareService; @Resource private RocketmqTransactionLogMapper rocketmqTransactionLogMapper; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Map<String, String> headers = msg.getProperties(); String transactionId = (String) headers.get("TRANSACTION_ID"); Integer shareId = Integer.valueOf((String) headers.get("share_id")); ShareAuditDTO auditDTO = JSON.parseObject(headers.get("dto"), ShareAuditDTO.class); try { shareService.auditByIdInDBWithRocketMqLog(shareId, auditDTO, transactionId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Map<String, String> headers = msg.getProperties(); String transactionId = (String) headers.get("TRANSACTION_ID"); RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build() ); if (rocketmqTransactionLog != null) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }
The transaction code here is similar to the non-stream method of implementing the two methods in RocketMQLocalTransactionListener.
The differences are:
To obtain the header, the non-stream method is to call the getHeaders() method, and the stream method is to call the getProperties() method.
The constant RocketMQHeaders.TRANSACTION_ID is no longer available in the stream mode. Just use the string “TRANSACTION_ID” to replace it.
3. Writing consumers
1. Write configuration
application.yml
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: ## New version fixed format channel name-{out/in}-{index} addBounsChannel-in-0: destination: add-boons group: bouns-consumer-group
Note: The channel under bindings can only have one in and one out. Multiple ins and multiple outs cannot be configured, otherwise it will cause configuration confusion.
2. Write code
consumer code
package com.itmuch.usercenter.rocketmq; import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO; import com.itmuch.usercenter.service.user.UserService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.function.Consumer; @Slf4j @Configuration public class AddBounsStreamConsumer { @Autowired private UserService userService; @Bean public Consumer<UserAddBonusMsgDTO> addBounsChannel() { return message -> { log.info("addBounsChannel received message: {}", message); userService.addBonus(message); }; } }
Note: The method name of the @Bean annotation is consistent with the first half of the channel name in the yml.
Referenced by userService.addBonus
package com.itmuch.usercenter.service.user; import com.itmuch.usercenter.dao.user.BonusEventLogMapper; import com.itmuch.usercenter.dao.user.UserMapper; import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO; import com.itmuch.usercenter.domain.entity.user.BonusEventLog; import com.itmuch.usercenter.domain.entity.user.User; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.Date; @Service @Slf4j public class UserService { @Resource private UserMapper userMapper; @Resource private BonusEventLogMapper bonusEventLogMapper; @Transactional(rollbackFor = Exception.class) public void addBonus(UserAddBonusMsgDTO message) { log.info("Consumption message message ={}",message); //When receiving the message, perform the business //1. Add points to users Integer userId = message.getUserId(); User user = userMapper.selectByPrimaryKey(userId); Integer bonus = message.getBonus(); user.setBouns(user.getBouns() + bonus); userMapper.updateByPrimaryKeySelective(user); //2. Record the log into the bounus_event_log table bonusEventLogMapper.insert( BonusEventLog.builder() .userId(userId) .value(bonus) .event("CONTRIBUTE") .createTime(new Date()) .description("Submit and get points..") .build() ); log.info("Points added.."); } }
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 15941 people are learning the system