Reprint: https://mp.weixin.qq.com/s?__biz=Mzg3Mjk3OTA4OA== & amp;mid=2247484545 & amp;idx=1 & amp;sn=9fe01ef81d68f1f3b8912e44b0a588b8 & amp;chksm=cee64b7ef991c2689d 11a3f33ab07e4f50cd388ab487b05659995e2dcbcac4742292a79be6ee & amp ;cur_album_id=2973104649560178694 & amp;scene=189#wechat_redirect
/** * @author qingzhou * * Through the @LocalTCC annotation, RM will register a branch transaction with TC when it is initialized. */ @LocalTCC public interface OrderService {<!-- --> @TwoPhaseBusinessAction(name = "prepareSaveOrder", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true) Order prepareSaveOrder(OrderVo orderVo, @BusinessActionContextParameter(paramName = "orderId") Long orderId); boolean commit(BusinessActionContext actionContext); boolean rollback(BusinessActionContext actionContext); }
Warehouse address: https://github.com/seata/seata/blob/v1.5.1/script/server/db/mysql.sql
CREATE TABLE IF NOT EXISTS `distributed_lock` ( `lock_key` CHAR(20) NOT NULL, `lock_value` VARCHAR(20) NOT NULL, `expire` BIGINT, primary key (`lock_key`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); CREATE TABLE `tcc_fence_log` ( `xid` varchar(128) NOT NULL COMMENT 'global id', `branch_id` bigint(20) NOT NULL COMMENT 'branch id', `action_name` varchar(64) NOT NULL COMMENT 'action name', `status` tinyint(4) NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)', `gmt_create` datetime(3) NOT NULL COMMENT 'create time', `gmt_modified` datetime(3) NOT NULL COMMENT 'update time', PRIMARY KEY (`xid`,`branch_id`), KEY `idx_gmt_modified` (`gmt_modified`), KEY `idx_status` (`status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${<!-- -->user.home}/logs/seata extend: logstash-appender: destination: 127.0.0.1:4560 kafka-appender: bootstrap-servers: 127.0.0.1:9092 topic: logback_to_logstash console: user: username: seata password: seata seata: config: # support: nacos, consul, apollo, zk, etcd3 type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: 7e838c12-8554-4231-82d5-6d93573ddf32 group: SEATA_GROUP username: password: ##if use MSE Nacos with auth, mutex with username/password attribute #access-key: "" #secret-key: "" data-id: seataServer.properties registry: # support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos preferred-networks: 30.240.* nacos: application: seata-server server-addr: 127.0.0.1:8848 group: SEATA_GROUP namespace: cluster:default username: password: ##if use MSE Nacos with auth, mutex with username/password attribute #access-key: "" #secret-key: "" store: # support: file, db, redis mode: db # server: # service-port: 8091 #If not configured, the default is '${server.port} + 1000' security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*. png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
https://github.com/seata/seata/blob/v1.5.1/script/config-center/config.txt
Main modifications to the config.txt file:
1. Storage mode:
store.mode=db
2. Database information:
store.db.url=jdbc:mysql://127.0.0.1:3306/seata
store.db.user=root
store.db.password=root
3. The transaction group must correspond to the tx-service-group: default_tx_group of the yml file in your project:
service.vgroupMapping.default_tx_group=default
The content is as follows:
#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html #Transport configuration, for client and server transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableTmClientBatchSendRequest=false transport.enableRmClientBatchSendRequest=true transport.enableTcServerBatchSendResponse=false transport.rpcRmRequestTimeout=30000 transport.rpcTmRequestTimeout=30000 transport.rpcTcRequestTimeout=30000 transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 transport.serialization=seata transport.compressor=none #Transaction routing rules configuration, only for the client service.vgroupMapping.default_tx_group=default #If you use a registry, you can ignore it service.default.grouplist=127.0.0.1:8091 service.enableDegrade=false service.disableGlobalTransaction=false #Transaction rule configuration, only for the client client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=true client.rm.tableMetaCheckerInterval=60000 client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.rm.sagaJsonParser=fastjson client.rm.tccActionInterceptorOrder=-2147482648 client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.defaultGlobalTransactionTimeout=60000 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 client.tm.interceptorOrder=-2147482648 client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.undo.compress.enable=true client.undo.compress.type=zip client.undo.compress.threshold=64k #For TCC transaction mode tcc.fence.logTableName=tcc_fence_log tcc.fence.cleanPeriod=1h #Log rule configuration, for client and server log.exceptionRate=100 #Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional. store.mode=db store.lock.mode=file store.session.mode=file #Used for password encryption store.publicKey= #If `store.mode,store.lock.mode,store.session.mode` are not equal to `file`, you can remove the configuration block. store.file.dir=file_store/data store.file.maxBranchSessionSize=16384 store.file.maxGlobalSessionSize=512 store.file.fileWriteBufferCacheSize=16384 store.file.flushDiskMode=async store.file.sessionReloadReadSize=100 #These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block. store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true &rewriteBatchedStatements=true store.db.user=root store.db.password=root store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.distributedLockTable=distributed_lock store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 #These configurations are required if the `store mode` is `redis`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `redis`, you can remove the configuration block. store.redis.mode=single store.redis.single.host=127.0.0.1 store.redis.single.port=6379 store.redis.sentinel.masterName= store.redis.sentinel.sentinelHosts= store.redis.maxConn=10 store.redis.minConn=1 store.redis.maxTotal=100 store.redis.database=0 store.redis.password= store.redis.queryLimit=100 #Transaction rule configuration, only for the server server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false server.distributedLockExpireTime=10000 server.xaerNotaRetryTimeout=60000 server.session.branchAsyncQueueSize=5000 server.session.enableBranchAsyncRemove=false #Metrics configuration, only for the server metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
server: port: 8028 spring: application: name: tcc-order-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 datasource: type: com.alibaba.druid.pool.DruidDataSource druid: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/db_tcc_order?useUnicode=true &characterEncoding=UTF-8 &serverTimezone=Asia/Shanghai username: root password: root initial-size: 10 max-active: 100 min-idle: 10 max-wait: 60000 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 test-while-idle: true test-on-borrow: false test-on-return: false stat-view-servlet: enabled: true url-pattern: /druid/* filter: stat: log-slow-sql: true slow-sql-millis: 1000 merge-sql: false wall: config: multi-statement-allow: true logging: level: com.tuling:debug seata: application-id: ${spring.application.name} # seata service group, should correspond to the suffix of service.vgroup_mapping configured on the server tx-service-group: default_tx_group registry: #Specify nacos as the registration center type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 namespace: group: SEATA_GROUP config: #Specify nacos as the configuration center type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: 7e838c12-8554-4231-82d5-6d93573ddf32 group: SEATA_GROUP data-id: seataServer.properties #Expose actuator endpoint management: endpoints: web: exposure: include: '*'
package com.qingzhou.tccorderservice.service.impl; import com.qingzhou.datasource.entity.Order; import com.qingzhou.tccorderservice.feign.AccountFeignService; import com.qingzhou.tccorderservice.feign.StorageFeignService; import com.qingzhou.tccorderservice.service.BussinessService; import com.qingzhou.tccorderservice.service.OrderService; import com.qingzhou.tccorderservice.util.UUIDGenerator; import com.qingzhou.tccorderservice.vo.OrderVo; import io.seata.core.context.RootContext; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author qingzhou */ @Service @Slf4j public class BusinessServiceImpl implements BussinessService {<!-- --> @Autowired private AccountFeignService accountFeignService; @Autowired private StorageFeignService storageFeignService; @Autowired private OrderService orderService; @Override @GlobalTransactional(name="createOrder",rollbackFor=Exception.class) public Order saveOrder(OrderVo orderVo) {<!-- --> log.info("==============User placed order================="); log.info("Current XID: {}", RootContext.getXID()); //Get the globally unique order number Long orderId = UUIDGenerator.generateUUID(); //Phase 1: Create order Order order = orderService.prepareSaveOrder(orderVo,orderId); //deduct inventory storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount()); //Deduct the balance accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney()); return order; } }
package com.qingzhou.tccorderservice.service; import com.qingzhou.datasource.entity.Order; import com.qingzhou.tccorderservice.vo.OrderVo; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author qingzhou * * Through the @LocalTCC annotation, RM will register a branch transaction with TC when it is initialized. */ @LocalTCC public interface OrderService {<!-- --> /** * TCC try method: save order information, status is paying * * Define two-phase submission. In the try phase, the resourceId, commit and cancel methods of the branch transaction are defined through the @TwoPhaseBusinessAction annotation. * name = bean name of the tcc, globally unique * commitMethod = commit is a two-stage confirmation method * rollbackMethod = rollback is a two-stage cancellation method *BusinessActionContextParameter annotation passes parameters to the second stage * The new feature of useTCCFence seata1.5.1 is used to solve the problems of TCC idempotence, suspension and empty rollback. The log table tcc_fence_log needs to be added. */ @TwoPhaseBusinessAction(name = "prepareSaveOrder", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true) Order prepareSaveOrder(OrderVo orderVo, @BusinessActionContextParameter(paramName = "orderId") Long orderId); /** * *TCC's confirm method: the order status is changed to payment successful * * The two-stage confirmation method can be named differently, but it must be consistent with commitMethod. * context can pass parameters of try method * * @param actionContext * @return */ boolean commit(BusinessActionContext actionContext); /** * TCC's cancel method: the order status is changed to payment failed * The two-stage cancellation method can be named differently, but it must be consistent with rollbackMethod. * * @param actionContext * @return */ boolean rollback(BusinessActionContext actionContext); }
package com.qingzhou.tccorderservice.service.impl; import com.qingzhou.datasource.entity.Order; import com.qingzhou.datasource.entity.OrderStatus; import com.qingzhou.datasource.mapper.OrderMapper; import com.qingzhou.tccorderservice.feign.AccountFeignService; import com.qingzhou.tccorderservice.feign.StorageFeignService; import com.qingzhou.tccorderservice.service.OrderService; import com.qingzhou.tccorderservice.vo.OrderVo; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author qingzhou */ @Service @Slf4j public class OrderServiceImpl implements OrderService {<!-- --> @Autowired private OrderMapper orderMapper; @Autowired private AccountFeignService accountFeignService; @Autowired private StorageFeignService storageFeignService; @Autowired private OrderService orderService; @Override @Transactional(rollbackFor=Exception.class) public Order prepareSaveOrder(OrderVo orderVo, @BusinessActionContextParameter(paramName = "orderId") Long orderId) {<!-- --> //Save order Order order = new Order(); order.setId(orderId); order.setUserId(orderVo.getUserId()); order.setCommodityCode(orderVo.getCommodityCode()); order.setCount(orderVo.getCount()); order.setMoney(orderVo.getMoney()); order.setStatus(OrderStatus.INIT.getValue()); Integer saveOrderRecord = orderMapper.insert(order); log.info("Save Order{}", saveOrderRecord > 0 ? "Success" : "Failure"); return order; } @Override public boolean commit(BusinessActionContext actionContext) {<!-- --> // Get order id long orderId = Long.parseLong(actionContext.getActionContext("orderId").toString()); //Update order status to payment successful Integer updateOrderRecord = orderMapper.updateOrderStatus(orderId,OrderStatus.SUCCESS.getValue()); log.info("Update order id: {} {}", orderId, updateOrderRecord > 0 ? "Success" : "Failure"); return true; } @Override public boolean rollback(BusinessActionContext actionContext) {<!-- --> //Get order id long orderId = Long.parseLong(actionContext.getActionContext("orderId").toString()); //Update order status to payment failed Integer updateOrderRecord = orderMapper.updateOrderStatus(orderId,OrderStatus.FAIL.getValue()); log.info("Update order id: {} {}", orderId, updateOrderRecord > 0 ? "Success" : "Failure"); return true; } }
package com.qingzhou.tccstorageservice.service; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author qingzhou * * Through the @LocalTCC annotation, RM will register a branch transaction with TC when it is initialized. */ @LocalTCC public interface StorageService {<!-- --> /** * Try: Inventory-deduction quantity, frozen inventory + deduction quantity * * Define two-phase submission. In the try phase, the resourceId, commit and cancel methods of the branch transaction are defined through the @TwoPhaseBusinessAction annotation. * name = bean name of the tcc, globally unique * commitMethod = commit is a two-stage confirmation method * rollbackMethod = rollback is a two-stage cancellation method *BusinessActionContextParameter annotation passes parameters to the second stage * * @param commodityCode commodity number * @param count deduction quantity * @return */ @TwoPhaseBusinessAction(name = "deduct", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true) boolean deduct(@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "count") int count); /** * * Confirm: frozen inventory-deduction quantity * The two-stage confirmation method can be named differently, but it must be consistent with commitMethod. * context can pass parameters of try method * * @param actionContext * @return */ boolean commit(BusinessActionContext actionContext); /** * Cancel: inventory + deduction quantity, frozen inventory - deduction quantity * The two-stage cancellation method can be named differently, but it must be consistent with rollbackMethod. * * @param actionContext * @return */ boolean rollback(BusinessActionContext actionContext); }
package com.qingzhou.tccstorageservice.service.impl; import com.qingzhou.datasource.entity.Storage; import com.qingzhou.datasource.mapper.StorageMapper; import com.qingzhou.tccstorageservice.service.StorageService; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author qingzhou */ @Service @Slf4j public class StorageServiceImpl implements StorageService {<!-- --> @Autowired private StorageMapper storageMapper; @Transactional @Override public boolean deduct(String commodityCode, int count){<!-- --> log.info("==============Frozen Inventory================="); log.info("Current XID: {}", RootContext.getXID()); // Check inventory checkStock(commodityCode,count); log.info("Start freezing {} inventory", commodityCode); //freeze inventory Integer record = storageMapper.freezeStorage(commodityCode,count); log.info("Freeze {} inventory result: {}", commodityCode, record > 0 ? "Operation successful" : "Failed to deduct inventory"); return true; } @Override public boolean commit(BusinessActionContext actionContext) {<!-- --> log.info("==============Deduction of frozen inventory================="); String commodityCode = actionContext.getActionContext("commodityCode").toString(); int count = (int) actionContext.getActionContext("count"); //Deduct frozen inventory storageMapper.reduceFreezeStorage(commodityCode,count); return true; } @Override public boolean rollback(BusinessActionContext actionContext) {<!-- --> log.info("=============Unfreeze inventory================="); String commodityCode = actionContext.getActionContext("commodityCode").toString(); int count = (int) actionContext.getActionContext("count"); //Deduct frozen inventory storageMapper.unfreezeStorage(commodityCode,count); return true; } private void checkStock(String commodityCode, int count){<!-- --> log.info("Check {} inventory", commodityCode); Storage storage = storageMapper.findByCommodityCode(commodityCode); if (storage.getCount() < count) {<!-- --> log.warn("{} Insufficient inventory, current inventory: {}", commodityCode, count); throw new RuntimeException("Insufficient inventory"); } } }
package com.qingzhou.tccaccountservice.service; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author qingzhou * * Through the @LocalTCC annotation, RM will register a branch transaction with TC when it is initialized. */ @LocalTCC public interface AccountService {<!-- --> /** * Deduction from user account * * Define two-phase submission. In the try phase, the resourceId, commit and cancel methods of the branch transaction are defined through the @TwoPhaseBusinessAction annotation. * name = bean name of the tcc, globally unique * commitMethod = commit is a two-stage confirmation method * rollbackMethod = rollback is a two-stage cancellation method * * @param userId * @param money The amount deducted from the user's account * @return */ @TwoPhaseBusinessAction(name = "debit", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true) boolean debit(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "money") int money); /** * To commit a transaction, the two-stage confirmation method can be named differently, but it must be consistent with commitMethod. * context can pass parameters of try method * * @param actionContext * @return */ boolean commit(BusinessActionContext actionContext); /** * To roll back a transaction, the two-stage cancellation method can be named differently, but it must be consistent with rollbackMethod. * * @param actionContext * @return */ boolean rollback(BusinessActionContext actionContext); }
package com.qingzhou.tccaccountservice.service.impl; import com.qingzhou.datasource.entity.Account; import com.qingzhou.datasource.mapper.AccountMapper; import com.qingzhou.tccaccountservice.service.AccountService; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author qingzhou */ @Service @Slf4j public class AccountServiceImpl implements AccountService {<!-- --> @Autowired private AccountMapper accountMapper; /** * Deduct user amount * @param userId * @param money */ @Transactional @Override public boolean debit(String userId, int money){<!-- --> log.info("==============Freeze user account balance================="); log.info("Current XID: {}", RootContext.getXID()); checkBalance(userId, money); log.info("Start freezing user {} balance", userId); //Freeze amount Integer record = accountMapper.freezeBalance(userId,money); log.info("Freeze user {} balance result: {}", userId, record > 0 ? "Operation successful" : "Failed to deduct balance"); return true; } @Override public boolean commit(BusinessActionContext actionContext) {<!-- --> log.info("==============Deduction of frozen amount================="); String userId = actionContext.getActionContext("userId").toString(); int money = (int) actionContext.getActionContext("money"); //Deduct the frozen amount accountMapper.reduceFreezeBalance(userId,money); return true; } @Override public boolean rollback(BusinessActionContext actionContext) {<!-- --> log.info("==============Unfreeze amount================="); String userId = actionContext.getActionContext("userId").toString(); int money = (int) actionContext.getActionContext("money"); //Unfreeze amount accountMapper.unfreezeBalance(userId,money); return true; } private void checkBalance(String userId, int money){<!-- --> log.info("Check user {} balance", userId); Account account = accountMapper.selectByUserId(userId); if (account.getMoney() < money) {<!-- --> log.warn("User {} has insufficient balance, current balance: {}", userId, account.getMoney()); throw new RuntimeException("Insufficient balance"); } } }
The above is the main code logic, the complete code link is available at:
https://pan.baidu.com/s/1csd9l_g-huiVnf_JUXWAtA?pwd=qmo3
T code acquisition: qmo3