RocketMQ message sending, message class

1. Message sending

1.1 Single producer and single consumer message sending (OneToOne)

1. Create a new maven project reccketmqtest

2. Import RocketMQ client coordinates


3. Producer

package com.liming.base;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

 * @author dawn
 * @version 1.0
 * producer
 * @date 2023/5/21 9:08
public class Producer {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
         1. Who will send it?
         2. To whom?
         3. How to send?
         4. Send what?
         5. What is the result?
         6. Clean up the field

        // 1. Create an object Producer that sends messages
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2. Set the sending name server address
        // 3.1 Start the sent service
        // 4. Create the message object to be sent
        Message message = new Message("topic1", "tag1","hello reccketmq".getBytes());
        // 3.2 Send message
        SendResult sendResult = producer. send(message);
        System.out.println("return result: " + sendResult);
        // 5. Close the connection
        producer. shutdown();

4. Consumers

package com.liming.base;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

 * @author dawn
 * @version 1.0
 * consumer
 * @date 2023/5/21 9:17
public class Consumer {<!-- -->
    public static void main(String[] args) throws MQClientException {<!-- -->
        //1. Create an object Consumer that receives messages
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Set the name server address received
        //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary
        consumer. subscribe("topic1", "*");
        //3. Turn on monitoring to receive messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- -->
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- -->
                //Loop through messages
                for (MessageExt msg : list) {<!-- -->
                    System.out.println("Received message: " + msg);
                    byte[] body = msg. getBody();
                    System.out.println(new String(body));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        //4. Start the service for receiving messages
        System.out.println("Receiving message service has been opened!");
        //5 Don't close the consumer!

1.2 Single producer and multiple consumer message sending (OneToMany)


//1. Create an object Producer for sending messages
 DefaultMQProducer producer = new DefaultMQProducer("group1");
 //2. Set the sending name server address
 //3.1 Start the sent service
 for (int i = 0; i < 10; i ++ ) {<!-- -->
     //4. Create the message object to be sent, specify the topic, and specify the content body
     Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes();
     //3.2 send message
     SendResult result = producer. send(msg);
     System.out.println("Return result: " + result);
 //5. Close the connection
 producer. shutdown();

Consumer (load balancing mode: default mode)

 //1. Create an object Consumer that receives messages
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 //2. Set the name server address received
 //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary
 consumer. subscribe("topic1","*");
 //Set the consumption mode of the current consumer (default mode: load balancing)
 //3. Turn on monitoring to receive messages
 consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- -->
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- -->
         //Loop through messages
         for (MessageExt msg : list) {<!-- -->
             System.out.println("Received message: " + msg);
             System.out.println("The message is: " + new String(msg.getBody()));
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 //4. Start the service for receiving messages
 System.out.println("Receiving message service has been opened!");

 //5 Don't close the consumer!

Consumer (broadcast mode)

//1. Create an object Consumer that receives messages
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2. Set the name server address received
//3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary
consumer. subscribe("topic1","*");
//Set the consumption mode of the current consumer (default mode: load balancing)
//Set the consumption mode of the current consumer (broadcast mode)
//3. Turn on monitoring to receive messages
consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- -->
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- -->
        //Loop through messages
        for (MessageExt msg : list) {<!-- -->
            System.out.println("Received message: " + msg);
            System.out.println("The message is: " + new String(msg.getBody()));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//4. Start the service for receiving messages
System.out.println("Receiving message service has been opened!");

//5 Don't close the consumer!

1.3 Multi-producer and multi-consumer message sending (ManyToMany)

Messages generated by multiple producers can be consumed by the same consumer or by multiple consumers

2. Message category

2.1 Synchronization messages

Features: Strong immediacy, important messages, and messages that must have a receipt, such as SMS, notification (successful transfer)

Code implementation (in the producer):

SendResult result = producer.send(msg);

2.2 Asynchronous messages

Features: Weak immediacy, but requires a message with a receipt, such as some information in the order

Code implementation (in the producer):

//1. Create an object Producer for sending messages
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2. Set the sending name server address
//3.1 Start the sent service
for (int i = 0; i < 10; i ++ ) {<!-- -->
    //4. Create the message object to be sent, specify the topic, and specify the content body
    Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8"));
    //3.2 Synchronization message
    //SendResult result = producer. send(msg);
    //System.out.println("Return result: " + result);

    // asynchronous message
    producer.send(msg, new SendCallback() {<!-- -->
        //Successful return result
        public void onSuccess(SendResult sendResult) {<!-- -->
        // Indicates that sending the message failed
        public void onException(Throwable throwable) {<!-- -->
    System.out.println("message" + i + "send finished, go to do business logic!");
//sleep for 10 seconds
TimeUnit. SECONDS. sleep(10);
//5. Close the connection
producer. shutdown();

2.3 One-way messages

Features: Messages that do not require a receipt, such as log messages

Code implementation (in the producer):


2.4 Delayed messages

When the message is sent, it is not sent directly to the message server, but arrives according to the set waiting time, which acts as a buffer for delayed arrival

Message msg = new Message("topic3",("Delay message: hello rocketmq " + i).getBytes("UTF-8"));
//Set delay level 3, this message will be sent after 10s (now only supports a few fixed times, see delayTimeLevel for details)
SendResult result = producer. send(msg);
System.out.println("Return result: " + result);

Currently supported message time:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

2.5 Bulk messages

Sending messages in batches can significantly improve the performance of delivering small messages.

Send bulk messages:

List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));


SendResult result = producer. send(msgList);

Note restrictions:

  • These bulk messages should have the same topic

  • same waitStoreMsgOK

  • Can’t be a delayed message

  • The total length of the message content does not exceed 4M

3. Message filtering

3.1 Category filtering

filter information by tag


Message msg = new Message("topic6","tag2",("Message filtering according to tag: hello rocketmq 2").getBytes("UTF-8"));


//When receiving a message, in addition to specifying the topic, you can also specify the received tag, * represents any tag
consumer.subscribe("topic6","tag1 || tag2");

3.2 Syntax filtering (attribute filtering/syntax filtering/SQL filtering)

Basic syntax:

  • Numerical comparison, such as: >, >=, <, <=, BETWEEN, =;
  • Character comparison, such as: =, <>, IN;
  • Logical symbols AND, OR, NOT;

Constant support types are:

  • Value, such as: 123, 3.1415;
  • Characters, such as: ‘abc’, must be wrapped in single quotes;
  • NULL, the special constant
  • Boolean value, TRUE or FALSE


//Add attributes to the message


//Use the message selector to filter the corresponding attribute, the syntax format is SQL-like syntax
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

Note: SQL filtering needs to rely on the functional support of the server, add the corresponding function item in the broker.conf configuration file, and enable the corresponding function


restart broker

start mqbroker.cmd -n autoCreateTopicEnable=true

Or enter directly in cmd (D:\software\rocketmq-all-4.8.0-bin-release\bin)

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

Fourth, business news

1. Normal course of business (gray lines)
2. Transaction compensation process (blue line)

Transaction message status

  • Submission status: Allowed to enter the queue, this message is indistinguishable from non-transactional messages
  • Rollback status: not allowed to enter the queue, this message is equivalent to not being sent
  • Intermediate state: The sending of the half message has been completed, and the second state confirmation of MQ has not been performed.

Note: Transaction messages are only related to producers, not consumers

Code Implementation:

submission status

//The producer used by the transaction message is TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
//Add the listener corresponding to the local transaction
producer.setTransactionListener(new TransactionListener() {<!-- -->
//normal transaction process
public LocalTransactionState executeLocalTransaction(Message message, Object o) {<!-- -->
return LocalTransactionState. COMMIT_MESSAGE;
// transaction compensation process
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {<!-- -->
return null;
Message msg = new Message("topic8",("Transaction message: hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer. sendMessageInTransaction(msg,null);
System.out.println("return result: " + result);
producer. shutdown();

rollback state

producer.setTransactionListener(new TransactionListener() {<!-- -->
// normal transaction
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- -->
    return LocalTransactionState. ROLLBACK_MESSAGE;
// transaction compensation
public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- -->
    return null;

Intermediate state

public static void main(String[] args) throws Exception {<!-- -->
TransactionMQProducer producer=new TransactionMQProducer("group1");
producer.setTransactionListener(new TransactionListener() {<!-- -->
// normal transaction
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- -->
return LocalTransactionState.UNKNOW;
// Transaction compensation is triggered only when UNKNOW is executed normally
public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- -->
System.out.println("Transaction Compensation");
return LocalTransactionState. COMMIT_MESSAGE;
Message msg = new Message("topic13", "hello rocketmq". getBytes("UTF-8"));
SendResult result = producer. sendMessageInTransaction(msg, null);
System.out.println("return result: " + result);
//The transaction compensation producer must always be started
//producer. shutdown();