Will the RocketMQ transaction message still be the original message after timeout?

Let’s analyze the following demo example to explore the principles of RocketMQ transaction messages.

 public static final String PRODUCER_GROUP = "tran-test";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "Test";

    public static void main(String[] args) throws Exception {<!-- -->
        TransactionListener transactionListener = new TransactionListener() {<!-- -->
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- -->
                System.out.println(String.format("executeLocalTransaction: %s", msg.getTransactionId()));
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- -->
                System.out.println(String.format("checkLocalTransaction: tranId=%s, commitLogOffset=%s, queueOffset=%s, msgId=%s",
                        msg.getTransactionId(), msg.getCommitLogOffset(),
                        msg.getQueueOffset(), msg.getMsgId()));
                return LocalTransactionState.UNKNOW;
            }
        };
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
        producer.setTransactionListener(transactionListener);
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();
        Message msg = new Message(TOPIC, "test".getBytes());
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println(String.format("sendResult: tranId=%s, offsetMsgId=%s, queueOffset=%s msgId=%s",
                sendResult.getTransactionId(), sendResult.getOffsetMsgId(),
                sendResult.getQueueOffset(), sendResult.getMsgId()));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }
executeLocalTransaction: C0DE00428BEC18B4AAC27F377B6E0000
sendResult: tranId=C0DE00428BEC18B4AAC27F377B6E0000, offsetMsgId=null, queueOffset=82 msgId=C0DE00428BEC18B4AAC27F377B6E0000
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315411, queueOffset=83, msgId=C0DE004200002A9F0000000000141253
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315805, queueOffset=84, msgId=C0DE004200002A9F00000000001413DD
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316199, queueOffset=85, msgId=C0DE004200002A9F0000000000141567
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316593, queueOffset=86, msgId=C0DE004200002A9F00000000001416F1
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316987, queueOffset=87, msgId=C0DE004200002A9F000000000014187B
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317381, queueOffset=88, msgId=C0DE004200002A9F0000000000141A05
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317775, queueOffset=89, msgId=C0DE004200002A9F0000000000141B8F
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318169, queueOffset=90, msgId=C0DE004200002A9F0000000000141D19
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318563, queueOffset=91, msgId=C0DE004200002A9F0000000000141EA3
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318957, queueOffset=92, msgId=C0DE004200002A9F000000000014202D
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319352, queueOffset=93, msgId=C0DE004200002A9F00000000001421B8
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319747, queueOffset=94, msgId=C0DE004200002A9F0000000000142343
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320142, queueOffset=95, msgId=C0DE004200002A9F00000000001424CE
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320537, queueOffset=96, msgId=C0DE004200002A9F0000000000142659
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320932, queueOffset=97, msgId=C0DE004200002A9F00000000001427E4

From the output of the above example, we can find the changes in queueOffset and msgId in checkLocalTransaction. So what exactly happens in the broker.

Principle of transaction messages

When a client sends a transaction message, MessageConst.PROPERTY_TRANSACTION_PREPARED=”true” marks the message as a transaction message.

 SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {<!-- -->
            sendResult = this.send(msg);
        } catch (Exception e) {<!-- -->
            throw new MQClientException("send message Exception", e);
        }

The broker will take out traFlag when receiving the message. If traFlag=true, the message will be handed over to TransactionalMessageService for processing.

 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        boolean sendTransactionPrepareMessage = false;
        if (Boolean.parseBoolean(traFlag)
             & amp; & amp; !(msgInner.getReconsumeTimes() > 0 & amp; & amp; msgInner.getDelayTimeLevel() > 0)) {<!-- --> //For client under version 4.6.1
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {<!-- -->
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                         + "] sending transaction message is forbidden");
                return response;
            }
            sendTransactionPrepareMessage = true;
        }

        long beginTimeMillis = this.brokerController.getMessageStore().now();

        if (brokerController.getBrokerConfig().isAsyncSendEnable()) {<!-- -->
            CompletableFuture<PutMessageResult> asyncPutMessageFuture;
            if (sendTransactionPrepareMessage) {<!-- -->
                //Process transaction messages
                asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
            } else {<!-- -->
                asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            }

When TransactionalMessageService saves the message, it will replace the original topic with RMQ_SYS_TRANS_HALF_TOPIC, and the original topic information is stored in properties. In this way, the message is saved first, rather than allowing the Consumer to receive it immediately.
When receiving the COMMIT_MESSAGE from TransactionMQProducer, the message is taken out from RMQ_SYS_TRANS_HALF_TOPIC and replaced with the original topic. At the same time, write a copy to the topic of RMQ_SYS_TRANS_OP_HALF_TOPIC.
The broker determines whether the transaction message is over by comparing whether RMQ_SYS_TRANS_OP_HALF_TOPIC and RMQ_SYS_TRANS_HALF_TOPIC exist at the same time.
When it receives UNKNOW instead of COMMIT_MESSAGE, TransactionalMessageCheckService will call back TransactionMQProducer#checkLocalTransaction regularly to query the local transaction status. By default, it can check up to 15 times.

TransactionalMessageCheckService

TransactionalMessageCheckService is a thread running in the broker. The thread is executed every 1 minute by default to detect timed-out half transaction messages in the system and initiate retries.

 @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {<!-- -->
        try {<!-- -->
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {<!-- -->
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {<!-- -->
                long startTime = System.currentTimeMillis();
                //Each half queue has a corresponding op queue
                MessageQueue opQueue = getOpQueue(messageQueue);
                //Get the offset of the currently unfinished half queue
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                //Get the offset of the currently completed op queue
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {<!-- -->
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                ...
                //single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                long nextOpOffset = pullResult.getNextBeginOffset();
                int putInQueueCount = 0;
                int escapeFailCnt = 0;

                while (true) {<!-- -->
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {<!-- -->
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    if (removeMap.containsKey(i)) {<!-- -->
                        ...
                    } else {<!-- -->
                        //Get half message from RMQ_SYS_TRANS_HALF_TOPIC
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {<!-- -->
                            if (getMessageNullCount + + > MAX_RETRY_COUNT_WHEN_HALF_NULL) {<!-- -->
                                break;
                            }
                    ...
                        ...
                        //Whether the message needs to be discarded
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {<!-- -->
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i + + ;
                            continue;
                        }
                        ...
                        //Determine whether the last check timed out
                        boolean isNeedCheck = opMsg == null & amp; & amp; valueOfCurrentMinusBorn > checkImmunityTime
                            || opMsg != null & amp; & amp; opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                            || valueOfCurrentMinusBorn <= -1;

                        if (isNeedCheck) {<!-- -->
                            //time out
                            if (!putBackHalfMsgQueue(msgExt, i)) {<!-- -->
                                continue;
                            }
                            putInQueueCount + + ;
                            log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",
                                    msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
                                    msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                                    msgExt.getQueueOffset(), msgExt.getCommitLogOffset());
                            //Re-initiate the check request to TransactionListener
                            listener.resolveHalfMsg(msgExt);
                ...
                ...
                ...
                if (newOffset != halfOffset) {<!-- -->
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {<!-- -->
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }

There are three important details in the above code, needDiscard, putBackHalfMsgQueue and listener.resolveHalfMsg.
needDiscard: After taking it out from the half queue, determine whether the TRANSACTION_CHECK_TIMES attribute of the message is greater than 15 times.
Less than 15 times, the TRANSACTION_CHECK_TIMES attribute value + 1.
If it is greater than 15 times, it will be discarded from RMQ_SYS_TRANS_HALF_TOPIC and saved in TRANS_CHECK_MAX_TIME_TOPIC through listener.resolveDiscardMsg for manual processing.
putBackHalfMsgQueue: Re-insert a copy of the message into RMQ_SYS_TRANS_HALF_TOPIC, because the applyOnly feature of CommitLog cannot modify the original message. Therefore, the message needs to be reapplied, causing queueOffset, commitLogOffset, and msgId to change.

 private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {<!-- -->
        PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
        if (putMessageResult != null
             & amp; & amp; putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {<!-- -->
            msgExt.setQueueOffset(
                putMessageResult.getAppendMessageResult().getLogicsOffset());
            msgExt.setCommitLogOffset(
                putMessageResult.getAppendMessageResult().getWroteOffset());
            msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            log.debug(
                "Send check message, the offset={} restored in queueOffset={} "
                     + "commitLogOffset={} "
                     + "newMsgId={} realMsgId={} topic={}",
                offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
                msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                msgExt.getTopic());
            return true;

listener.resolveHalfMsg: Resend the check to TransactionMQProducer through the callback resolveHalfMsg method.

 public void resolveHalfMsg(final MessageExt msgExt) {<!-- -->
        if (executorService != null) {<!-- -->
            executorService.execute(new Runnable() {<!-- -->
                @Override
                public void run() {<!-- -->
                    try {<!-- -->
                        sendCheckMessage(msgExt);
                    } catch (Exception e) {<!-- -->
                        LOGGER.error("Send check message error!", e);
                    }
                }
            });
        } else {<!-- -->
            LOGGER.error("TransactionalMessageCheckListener not init");
        }
    }

    public void sendCheckMessage(MessageExt msgExt) throws Exception {<!-- -->
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {<!-- -->
            //Take out the netty channel connected to the broker and send the check message
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {<!-- -->
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }

half message diagram