RocketMQ message sending, message class

1. Message sending

1.1 Single producer and single consumer message sending (OneToOne)

1. Create a new maven project reccketmqtest

2. Import RocketMQ client coordinates

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

3. Producer

package com.liming.base;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @author dawn
 * @version 1.0
 * producer
 * @date 2023/5/21 9:08
 */
public class Producer {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        /**
         1. Who will send it?
         2. To whom?
         3. How to send?
         4. Send what?
         5. What is the result?
         6. Clean up the field
         **/

        // 1. Create an object Producer that sends messages
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2. Set the sending name server address
        producer.setNamesrvAddr("localhost:9876");
        // 3.1 Start the sent service
        producer.start();
        // 4. Create the message object to be sent
        Message message = new Message("topic1", "tag1","hello reccketmq".getBytes());
        // 3.2 Send message
        SendResult sendResult = producer. send(message);
        System.out.println("return result: " + sendResult);
        // 5. Close the connection
        producer. shutdown();
    }
}

4. Consumers

package com.liming.base;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author dawn
 * @version 1.0
 * consumer
 * @date 2023/5/21 9:17
 */
public class Consumer {<!-- -->
    public static void main(String[] args) throws MQClientException {<!-- -->
        //1. Create an object Consumer that receives messages
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Set the name server address received
        consumer.setNamesrvAddr("localhost:9876");
        //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary
        consumer. subscribe("topic1", "*");
        //3. Turn on monitoring to receive messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- -->
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- -->
                //Loop through messages
                for (MessageExt msg : list) {<!-- -->
                    System.out.println("Received message: " + msg);
                    byte[] body = msg. getBody();
                    System.out.println(new String(body));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4. Start the service for receiving messages
        consumer.start();
        System.out.println("Receiving message service has been opened!");
        //5 Don't close the consumer!
    }
}

1.2 Single producer and multiple consumer message sending (OneToMany)

producer

//1. Create an object Producer for sending messages
 DefaultMQProducer producer = new DefaultMQProducer("group1");
 //2. Set the sending name server address
 producer.setNamesrvAddr("localhost:9876");
 //3.1 Start the sent service
 producer.start();
 for (int i = 0; i < 10; i ++ ) {<!-- -->
     //4. Create the message object to be sent, specify the topic, and specify the content body
     Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes();
     //3.2 send message
     SendResult result = producer. send(msg);
     System.out.println("Return result: " + result);
 }
 //5. Close the connection
 producer. shutdown();

Consumer (load balancing mode: default mode)

 //1. Create an object Consumer that receives messages
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 //2. Set the name server address received
 consumer.setNamesrvAddr("localhost:9876");
 //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary
 consumer. subscribe("topic1","*");
 //Set the consumption mode of the current consumer (default mode: load balancing)
 consumer.setMessageModel(MessageModel.CLUSTERING);
 //3. Turn on monitoring to receive messages
 consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- -->
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- -->
         //Loop through messages
         for (MessageExt msg : list) {<!-- -->
             System.out.println("Received message: " + msg);
             System.out.println("The message is: " + new String(msg.getBody()));
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });
 //4. Start the service for receiving messages
 consumer.start();
 System.out.println("Receiving message service has been opened!");

 //5 Don't close the consumer!

Consumer (broadcast mode)

//1. Create an object Consumer that receives messages
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2. Set the name server address received
consumer.setNamesrvAddr("localhost:9876");
//3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary
consumer. subscribe("topic1","*");
//Set the consumption mode of the current consumer (default mode: load balancing)
//consumer.setMessageModel(MessageModel.CLUSTERING);
//Set the consumption mode of the current consumer (broadcast mode)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3. Turn on monitoring to receive messages
consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- -->
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- -->
        //Loop through messages
        for (MessageExt msg : list) {<!-- -->
            System.out.println("Received message: " + msg);
            System.out.println("The message is: " + new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
//4. Start the service for receiving messages
consumer.start();
System.out.println("Receiving message service has been opened!");

//5 Don't close the consumer!

1.3 Multi-producer and multi-consumer message sending (ManyToMany)

Messages generated by multiple producers can be consumed by the same consumer or by multiple consumers

2. Message category

2.1 Synchronization messages

Features: Strong immediacy, important messages, and messages that must have a receipt, such as SMS, notification (successful transfer)

Code implementation (in the producer):

SendResult result = producer.send(msg);

2.2 Asynchronous messages

Features: Weak immediacy, but requires a message with a receipt, such as some information in the order

Code implementation (in the producer):

//1. Create an object Producer for sending messages
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2. Set the sending name server address
producer.setNamesrvAddr("localhost:9876");
//3.1 Start the sent service
producer.start();
for (int i = 0; i < 10; i ++ ) {<!-- -->
    //4. Create the message object to be sent, specify the topic, and specify the content body
    Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8"));
    //3.2 Synchronization message
    //SendResult result = producer. send(msg);
    //System.out.println("Return result: " + result);

    // asynchronous message
    producer.send(msg, new SendCallback() {<!-- -->
        //Successful return result
        @Override
        public void onSuccess(SendResult sendResult) {<!-- -->
            System.out.println(sendResult);
        }
        // Indicates that sending the message failed
        @Override
        public void onException(Throwable throwable) {<!-- -->
            System.out.println(throwable);
        }
    });
    
    System.out.println("message" + i + "send finished, go to do business logic!");
}
//sleep for 10 seconds
TimeUnit. SECONDS. sleep(10);
//5. Close the connection
producer. shutdown();

2.3 One-way messages

Features: Messages that do not require a receipt, such as log messages

Code implementation (in the producer):

producer.sendOneway(msg);

2.4 Delayed messages

When the message is sent, it is not sent directly to the message server, but arrives according to the set waiting time, which acts as a buffer for delayed arrival

Message msg = new Message("topic3",("Delay message: hello rocketmq " + i).getBytes("UTF-8"));
//Set delay level 3, this message will be sent after 10s (now only supports a few fixed times, see delayTimeLevel for details)
msg.setDelayTimeLevel(3);
SendResult result = producer. send(msg);
System.out.println("Return result: " + result);

Currently supported message time:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

2.5 Bulk messages

Sending messages in batches can significantly improve the performance of delivering small messages.

Send bulk messages:

List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));

msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);


SendResult result = producer. send(msgList);

Note restrictions:

  • These bulk messages should have the same topic

  • same waitStoreMsgOK

  • Can’t be a delayed message

  • The total length of the message content does not exceed 4M

3. Message filtering

3.1 Category filtering

filter information by tag

Producer:

Message msg = new Message("topic6","tag2",("Message filtering according to tag: hello rocketmq 2").getBytes("UTF-8"));

consumer:

//When receiving a message, in addition to specifying the topic, you can also specify the received tag, * represents any tag
consumer.subscribe("topic6","tag1 || tag2");

3.2 Syntax filtering (attribute filtering/syntax filtering/SQL filtering)

Basic syntax:

  • Numerical comparison, such as: >, >=, <, <=, BETWEEN, =;
  • Character comparison, such as: =, <>, IN;
  • IS NULL or IS NOT NULL;
  • Logical symbols AND, OR, NOT;

Constant support types are:

  • Value, such as: 123, 3.1415;
  • Characters, such as: ‘abc’, must be wrapped in single quotes;
  • NULL, the special constant
  • Boolean value, TRUE or FALSE

Producer:

//Add attributes to the message
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

consumer:

//Use the message selector to filter the corresponding attribute, the syntax format is SQL-like syntax
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

Note: SQL filtering needs to rely on the functional support of the server, add the corresponding function item in the broker.conf configuration file, and enable the corresponding function

enablePropertyFilter=true

restart broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

Or enter directly in cmd (D:\software\rocketmq-all-4.8.0-bin-release\bin)

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

Fourth, business news

1. Normal course of business (gray lines)
2. Transaction compensation process (blue line)

Transaction message status

  • Submission status: Allowed to enter the queue, this message is indistinguishable from non-transactional messages
  • Rollback status: not allowed to enter the queue, this message is equivalent to not being sent
  • Intermediate state: The sending of the half message has been completed, and the second state confirmation of MQ has not been performed.

Note: Transaction messages are only related to producers, not consumers

Code Implementation:

submission status

//The producer used by the transaction message is TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
//Add the listener corresponding to the local transaction
producer.setTransactionListener(new TransactionListener() {<!-- -->
//normal transaction process
public LocalTransactionState executeLocalTransaction(Message message, Object o) {<!-- -->
return LocalTransactionState. COMMIT_MESSAGE;
}
// transaction compensation process
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {<!-- -->
return null;
}
});
producer.start();
Message msg = new Message("topic8",("Transaction message: hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer. sendMessageInTransaction(msg,null);
System.out.println("return result: " + result);
producer. shutdown();

rollback state

producer.setTransactionListener(new TransactionListener() {<!-- -->
// normal transaction
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- -->
    return LocalTransactionState. ROLLBACK_MESSAGE;
}
// transaction compensation
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- -->
    return null;
}
});

Intermediate state

public static void main(String[] args) throws Exception {<!-- -->
TransactionMQProducer producer=new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {<!-- -->
// normal transaction
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- -->
return LocalTransactionState.UNKNOW;
}
// Transaction compensation is triggered only when UNKNOW is executed normally
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- -->
System.out.println("Transaction Compensation");
return LocalTransactionState. COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic13", "hello rocketmq". getBytes("UTF-8"));
SendResult result = producer. sendMessageInTransaction(msg, null);
System.out.println("return result: " + result);
\t
//The transaction compensation producer must always be started
//producer. shutdown();
}