1. Message sending
1.1 Single producer and single consumer message sending (OneToOne)
1. Create a new maven project reccketmqtest
2. Import RocketMQ client coordinates
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
3. Producer
package com.liming.base; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * @author dawn * @version 1.0 * producer * @date 2023/5/21 9:08 */ public class Producer {<!-- --> public static void main(String[] args) throws Exception {<!-- --> /** 1. Who will send it? 2. To whom? 3. How to send? 4. Send what? 5. What is the result? 6. Clean up the field **/ // 1. Create an object Producer that sends messages DefaultMQProducer producer = new DefaultMQProducer("group1"); // 2. Set the sending name server address producer.setNamesrvAddr("localhost:9876"); // 3.1 Start the sent service producer.start(); // 4. Create the message object to be sent Message message = new Message("topic1", "tag1","hello reccketmq".getBytes()); // 3.2 Send message SendResult sendResult = producer. send(message); System.out.println("return result: " + sendResult); // 5. Close the connection producer. shutdown(); } }
4. Consumers
package com.liming.base; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author dawn * @version 1.0 * consumer * @date 2023/5/21 9:17 */ public class Consumer {<!-- --> public static void main(String[] args) throws MQClientException {<!-- --> //1. Create an object Consumer that receives messages DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2. Set the name server address received consumer.setNamesrvAddr("localhost:9876"); //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary consumer. subscribe("topic1", "*"); //3. Turn on monitoring to receive messages consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- --> @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- --> //Loop through messages for (MessageExt msg : list) {<!-- --> System.out.println("Received message: " + msg); byte[] body = msg. getBody(); System.out.println(new String(body)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4. Start the service for receiving messages consumer.start(); System.out.println("Receiving message service has been opened!"); //5 Don't close the consumer! } }
1.2 Single producer and multiple consumer message sending (OneToMany)
producer
//1. Create an object Producer for sending messages DefaultMQProducer producer = new DefaultMQProducer("group1"); //2. Set the sending name server address producer.setNamesrvAddr("localhost:9876"); //3.1 Start the sent service producer.start(); for (int i = 0; i < 10; i ++ ) {<!-- --> //4. Create the message object to be sent, specify the topic, and specify the content body Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes(); //3.2 send message SendResult result = producer. send(msg); System.out.println("Return result: " + result); } //5. Close the connection producer. shutdown();
Consumer (load balancing mode: default mode)
//1. Create an object Consumer that receives messages DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2. Set the name server address received consumer.setNamesrvAddr("localhost:9876"); //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary consumer. subscribe("topic1","*"); //Set the consumption mode of the current consumer (default mode: load balancing) consumer.setMessageModel(MessageModel.CLUSTERING); //3. Turn on monitoring to receive messages consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- --> @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- --> //Loop through messages for (MessageExt msg : list) {<!-- --> System.out.println("Received message: " + msg); System.out.println("The message is: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4. Start the service for receiving messages consumer.start(); System.out.println("Receiving message service has been opened!"); //5 Don't close the consumer!
Consumer (broadcast mode)
//1. Create an object Consumer that receives messages DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2. Set the name server address received consumer.setNamesrvAddr("localhost:9876"); //3. Set the topic corresponding to the received message, and the corresponding sub tag is arbitrary consumer. subscribe("topic1","*"); //Set the consumption mode of the current consumer (default mode: load balancing) //consumer.setMessageModel(MessageModel.CLUSTERING); //Set the consumption mode of the current consumer (broadcast mode) consumer.setMessageModel(MessageModel.BROADCASTING); //3. Turn on monitoring to receive messages consumer.registerMessageListener(new MessageListenerConcurrently() {<!-- --> @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {<!-- --> //Loop through messages for (MessageExt msg : list) {<!-- --> System.out.println("Received message: " + msg); System.out.println("The message is: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4. Start the service for receiving messages consumer.start(); System.out.println("Receiving message service has been opened!"); //5 Don't close the consumer!
1.3 Multi-producer and multi-consumer message sending (ManyToMany)
Messages generated by multiple producers can be consumed by the same consumer or by multiple consumers
2. Message category
2.1 Synchronization messages
Features: Strong immediacy, important messages, and messages that must have a receipt, such as SMS, notification (successful transfer)
Code implementation (in the producer):
SendResult result = producer.send(msg);
2.2 Asynchronous messages
Features: Weak immediacy, but requires a message with a receipt, such as some information in the order
Code implementation (in the producer):
//1. Create an object Producer for sending messages DefaultMQProducer producer = new DefaultMQProducer("group1"); //2. Set the sending name server address producer.setNamesrvAddr("localhost:9876"); //3.1 Start the sent service producer.start(); for (int i = 0; i < 10; i ++ ) {<!-- --> //4. Create the message object to be sent, specify the topic, and specify the content body Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8")); //3.2 Synchronization message //SendResult result = producer. send(msg); //System.out.println("Return result: " + result); // asynchronous message producer.send(msg, new SendCallback() {<!-- --> //Successful return result @Override public void onSuccess(SendResult sendResult) {<!-- --> System.out.println(sendResult); } // Indicates that sending the message failed @Override public void onException(Throwable throwable) {<!-- --> System.out.println(throwable); } }); System.out.println("message" + i + "send finished, go to do business logic!"); } //sleep for 10 seconds TimeUnit. SECONDS. sleep(10); //5. Close the connection producer. shutdown();
2.3 One-way messages
Features: Messages that do not require a receipt, such as log messages
Code implementation (in the producer):
producer.sendOneway(msg);
2.4 Delayed messages
When the message is sent, it is not sent directly to the message server, but arrives according to the set waiting time, which acts as a buffer for delayed arrival
Message msg = new Message("topic3",("Delay message: hello rocketmq " + i).getBytes("UTF-8")); //Set delay level 3, this message will be sent after 10s (now only supports a few fixed times, see delayTimeLevel for details) msg.setDelayTimeLevel(3); SendResult result = producer. send(msg); System.out.println("Return result: " + result);
Currently supported message time:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
2.5 Bulk messages
Sending messages in batches can significantly improve the performance of delivering small messages.
Send bulk messages:
List<Message> msgList = new ArrayList<Message>(); Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8")); Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8")); Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8")); msgList.add(msg1); msgList.add(msg2); msgList.add(msg3); SendResult result = producer. send(msgList);
Note restrictions:
-
These bulk messages should have the same topic
-
same waitStoreMsgOK
-
Can’t be a delayed message
-
The total length of the message content does not exceed 4M
3. Message filtering
3.1 Category filtering
filter information by tag
Producer:
Message msg = new Message("topic6","tag2",("Message filtering according to tag: hello rocketmq 2").getBytes("UTF-8"));
consumer:
//When receiving a message, in addition to specifying the topic, you can also specify the received tag, * represents any tag consumer.subscribe("topic6","tag1 || tag2");
3.2 Syntax filtering (attribute filtering/syntax filtering/SQL filtering)
Basic syntax:
- Numerical comparison, such as: >, >=, <, <=, BETWEEN, =;
- Character comparison, such as: =, <>, IN;
- IS NULL or IS NOT NULL;
- Logical symbols AND, OR, NOT;
Constant support types are:
- Value, such as: 123, 3.1415;
- Characters, such as: ‘abc’, must be wrapped in single quotes;
- NULL, the special constant
- Boolean value, TRUE or FALSE
Producer:
//Add attributes to the message msg.putUserProperty("vip","1"); msg.putUserProperty("age","20");
consumer:
//Use the message selector to filter the corresponding attribute, the syntax format is SQL-like syntax consumer.subscribe("topic7", MessageSelector.bySql("age >= 18")); consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
Note: SQL filtering needs to rely on the functional support of the server, add the corresponding function item in the broker.conf configuration file, and enable the corresponding function
enablePropertyFilter=true
restart broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
Or enter directly in cmd (D:\software\rocketmq-all-4.8.0-bin-release\bin)
mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
Fourth, business news
1. Normal course of business (gray lines)
2. Transaction compensation process (blue line)
Transaction message status
- Submission status: Allowed to enter the queue, this message is indistinguishable from non-transactional messages
- Rollback status: not allowed to enter the queue, this message is equivalent to not being sent
- Intermediate state: The sending of the half message has been completed, and the second state confirmation of MQ has not been performed.
Note: Transaction messages are only related to producers, not consumers
Code Implementation:
submission status
//The producer used by the transaction message is TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); //Add the listener corresponding to the local transaction producer.setTransactionListener(new TransactionListener() {<!-- --> //normal transaction process public LocalTransactionState executeLocalTransaction(Message message, Object o) {<!-- --> return LocalTransactionState. COMMIT_MESSAGE; } // transaction compensation process public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {<!-- --> return null; } }); producer.start(); Message msg = new Message("topic8",("Transaction message: hello rocketmq ").getBytes("UTF-8")); SendResult result = producer. sendMessageInTransaction(msg,null); System.out.println("return result: " + result); producer. shutdown();
rollback state
producer.setTransactionListener(new TransactionListener() {<!-- --> // normal transaction @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- --> return LocalTransactionState. ROLLBACK_MESSAGE; } // transaction compensation @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- --> return null; } });
Intermediate state
public static void main(String[] args) throws Exception {<!-- --> TransactionMQProducer producer=new TransactionMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.setTransactionListener(new TransactionListener() {<!-- --> // normal transaction @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- --> return LocalTransactionState.UNKNOW; } // Transaction compensation is triggered only when UNKNOW is executed normally @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {<!-- --> System.out.println("Transaction Compensation"); return LocalTransactionState. COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic13", "hello rocketmq". getBytes("UTF-8")); SendResult result = producer. sendMessageInTransaction(msg, null); System.out.println("return result: " + result); \t //The transaction compensation producer must always be started //producer. shutdown(); }