Let’s analyze the following demo example to explore the principles of RocketMQ transaction messages.
public static final String PRODUCER_GROUP = "tran-test"; public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876"; public static final String TOPIC = "Test"; public static void main(String[] args) throws Exception {<!-- --> TransactionListener transactionListener = new TransactionListener() {<!-- --> @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- --> System.out.println(String.format("executeLocalTransaction: %s", msg.getTransactionId())); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- --> System.out.println(String.format("checkLocalTransaction: tranId=%s, commitLogOffset=%s, queueOffset=%s, msgId=%s", msg.getTransactionId(), msg.getCommitLogOffset(), msg.getQueueOffset(), msg.getMsgId())); return LocalTransactionState.UNKNOW; } }; TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); producer.setTransactionListener(transactionListener); producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); producer.start(); Message msg = new Message(TOPIC, "test".getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println(String.format("sendResult: tranId=%s, offsetMsgId=%s, queueOffset=%s msgId=%s", sendResult.getTransactionId(), sendResult.getOffsetMsgId(), sendResult.getQueueOffset(), sendResult.getMsgId())); CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatch.await(); }
executeLocalTransaction: C0DE00428BEC18B4AAC27F377B6E0000 sendResult: tranId=C0DE00428BEC18B4AAC27F377B6E0000, offsetMsgId=null, queueOffset=82 msgId=C0DE00428BEC18B4AAC27F377B6E0000 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315411, queueOffset=83, msgId=C0DE004200002A9F0000000000141253 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315805, queueOffset=84, msgId=C0DE004200002A9F00000000001413DD checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316199, queueOffset=85, msgId=C0DE004200002A9F0000000000141567 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316593, queueOffset=86, msgId=C0DE004200002A9F00000000001416F1 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316987, queueOffset=87, msgId=C0DE004200002A9F000000000014187B checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317381, queueOffset=88, msgId=C0DE004200002A9F0000000000141A05 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317775, queueOffset=89, msgId=C0DE004200002A9F0000000000141B8F checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318169, queueOffset=90, msgId=C0DE004200002A9F0000000000141D19 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318563, queueOffset=91, msgId=C0DE004200002A9F0000000000141EA3 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318957, queueOffset=92, msgId=C0DE004200002A9F000000000014202D checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319352, queueOffset=93, msgId=C0DE004200002A9F00000000001421B8 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319747, queueOffset=94, msgId=C0DE004200002A9F0000000000142343 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320142, queueOffset=95, msgId=C0DE004200002A9F00000000001424CE checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320537, queueOffset=96, msgId=C0DE004200002A9F0000000000142659 checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320932, queueOffset=97, msgId=C0DE004200002A9F00000000001427E4
From the output of the above example, we can find the changes in queueOffset and msgId in checkLocalTransaction. So what exactly happens in the broker.
Principle of transaction messages
When a client sends a transaction message, MessageConst.PROPERTY_TRANSACTION_PREPARED=”true” marks the message as a transaction message.
SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try {<!-- --> sendResult = this.send(msg); } catch (Exception e) {<!-- --> throw new MQClientException("send message Exception", e); }
The broker will take out traFlag when receiving the message. If traFlag=true, the message will be handed over to TransactionalMessageService for processing.
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) & amp; & amp; !(msgInner.getReconsumeTimes() > 0 & amp; & amp; msgInner.getDelayTimeLevel() > 0)) {<!-- --> //For client under version 4.6.1 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {<!-- --> response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } sendTransactionPrepareMessage = true; } long beginTimeMillis = this.brokerController.getMessageStore().now(); if (brokerController.getBrokerConfig().isAsyncSendEnable()) {<!-- --> CompletableFuture<PutMessageResult> asyncPutMessageFuture; if (sendTransactionPrepareMessage) {<!-- --> //Process transaction messages asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else {<!-- --> asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }
When TransactionalMessageService saves the message, it will replace the original topic with RMQ_SYS_TRANS_HALF_TOPIC, and the original topic information is stored in properties. In this way, the message is saved first, rather than allowing the Consumer to receive it immediately.
When receiving the COMMIT_MESSAGE from TransactionMQProducer, the message is taken out from RMQ_SYS_TRANS_HALF_TOPIC and replaced with the original topic. At the same time, write a copy to the topic of RMQ_SYS_TRANS_OP_HALF_TOPIC.
The broker determines whether the transaction message is over by comparing whether RMQ_SYS_TRANS_OP_HALF_TOPIC and RMQ_SYS_TRANS_HALF_TOPIC exist at the same time.
When it receives UNKNOW instead of COMMIT_MESSAGE, TransactionalMessageCheckService will call back TransactionMQProducer#checkLocalTransaction regularly to query the local transaction status. By default, it can check up to 15 times.
TransactionalMessageCheckService
TransactionalMessageCheckService is a thread running in the broker. The thread is executed every 1 minute by default to detect timed-out half transaction messages in the system and initiate retries.
@Override public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {<!-- --> try {<!-- --> String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) {<!-- --> log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); for (MessageQueue messageQueue : msgQueues) {<!-- --> long startTime = System.currentTimeMillis(); //Each half queue has a corresponding op queue MessageQueue opQueue = getOpQueue(messageQueue); //Get the offset of the currently unfinished half queue long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); //Get the offset of the currently completed op queue long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) {<!-- --> log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } ... //single thread int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; long nextOpOffset = pullResult.getNextBeginOffset(); int putInQueueCount = 0; int escapeFailCnt = 0; while (true) {<!-- --> if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {<!-- --> log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } if (removeMap.containsKey(i)) {<!-- --> ... } else {<!-- --> //Get half message from RMQ_SYS_TRANS_HALF_TOPIC GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) {<!-- --> if (getMessageNullCount + + > MAX_RETRY_COUNT_WHEN_HALF_NULL) {<!-- --> break; } ... ... //Whether the message needs to be discarded if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {<!-- --> listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i + + ; continue; } ... //Determine whether the last check timed out boolean isNeedCheck = opMsg == null & amp; & amp; valueOfCurrentMinusBorn > checkImmunityTime || opMsg != null & amp; & amp; opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout || valueOfCurrentMinusBorn <= -1; if (isNeedCheck) {<!-- --> //time out if (!putBackHalfMsgQueue(msgExt, i)) {<!-- --> continue; } putInQueueCount + + ; log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}", msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), msgExt.getQueueOffset(), msgExt.getCommitLogOffset()); //Re-initiate the check request to TransactionListener listener.resolveHalfMsg(msgExt); ... ... ... if (newOffset != halfOffset) {<!-- --> transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) {<!-- --> transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }
There are three important details in the above code, needDiscard, putBackHalfMsgQueue and listener.resolveHalfMsg.
needDiscard: After taking it out from the half queue, determine whether the TRANSACTION_CHECK_TIMES attribute of the message is greater than 15 times.
Less than 15 times, the TRANSACTION_CHECK_TIMES attribute value + 1.
If it is greater than 15 times, it will be discarded from RMQ_SYS_TRANS_HALF_TOPIC and saved in TRANS_CHECK_MAX_TIME_TOPIC through listener.resolveDiscardMsg for manual processing.
putBackHalfMsgQueue: Re-insert a copy of the message into RMQ_SYS_TRANS_HALF_TOPIC, because the applyOnly feature of CommitLog cannot modify the original message. Therefore, the message needs to be reapplied, causing queueOffset, commitLogOffset, and msgId to change.
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {<!-- --> PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt); if (putMessageResult != null & amp; & amp; putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {<!-- --> msgExt.setQueueOffset( putMessageResult.getAppendMessageResult().getLogicsOffset()); msgExt.setCommitLogOffset( putMessageResult.getAppendMessageResult().getWroteOffset()); msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); log.debug( "Send check message, the offset={} restored in queueOffset={} " + "commitLogOffset={} " + "newMsgId={} realMsgId={} topic={}", offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), msgExt.getTopic()); return true;
listener.resolveHalfMsg: Resend the check to TransactionMQProducer through the callback resolveHalfMsg method.
public void resolveHalfMsg(final MessageExt msgExt) {<!-- --> if (executorService != null) {<!-- --> executorService.execute(new Runnable() {<!-- --> @Override public void run() {<!-- --> try {<!-- --> sendCheckMessage(msgExt); } catch (Exception e) {<!-- --> LOGGER.error("Send check message error!", e); } } }); } else {<!-- --> LOGGER.error("TransactionalMessageCheckListener not init"); } } public void sendCheckMessage(MessageExt msgExt) throws Exception {<!-- --> CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); if (channel != null) {<!-- --> //Take out the netty channel connected to the broker and send the check message brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else {<!-- --> LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }