Provide corresponding solutions for the shortcomings introduced by Rokcetmq

1. Introduce usability reduction problem for Rokcermq
Rocketmq implements high-availability mode. Rocketmq has three modes: stand-alone mode, master-slave mode, and fragmented cluster mode.
stand-alone mode
The stand-alone mode is at the Demo level. Generally, you start it locally for fun, and no one uses the stand-alone mode for production.
docker-compose.yml

version: '3'
services:
    namesrv:
      image: foxiswho/rocketmq:4.8.0
      container_name: rmqnamesrver
      ports:
        - "9876:9876"
      environment:
        JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
      command: sh mqnamesrv
    broker:
      image: foxiswho/rocketmq:4.8.0
      container_name: rmqbroker
      ports:
        - "10911:10911"
      environment:
        NAMESERV_ADDR: "127.0.0.1:9876"
        JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
      volumes:
        - ./logs:/home/rocketmq/logs
        - ./store:/home/rocketmq/store
        - ./conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf
      command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf
      depends_on:
        -namesrv
    mqconsole:
      image: styletang/rocketmq-console-ng
      container_name: rmqconsole
      ports:
        - "10100:8080"
      environment:
        JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false
      depends_on:
        -namesrv

Advantages: Local development and testing, simple configuration, and no message will be lost when synchronously swiping the disk
Disadvantages: Unreliable, if it goes down, it will cause the service to be unavailable
Master-slave mode: (dynamically increase master-slave nodes: such as multiple masters and multiple slaves)
docker-compose.yml file

version: '3'
services:
    namesrv:
      image: foxiswho/rocketmq:4.8.0
      container_name: rmqnamesrver
      ports:
        - "9876:9876"
      environment:
        JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
      command: sh mqnamesrv
    broker-a:
      image: foxiswho/rocketmq:4.8.0
      container_name: rmqbroker-a
      ports:
        - "10911:10911"
        - "10912:10912"
      environment:
        NAMESERV_ADDR: "127.0.0.1:9876"
        JAVA_OPT_EXT: "-Xms128M -Xmx128M -Xmn128m"
      volumes:
        - ./broker-a/logs:/home/rocketmq/logs
        - ./broker-a/store:/home/rocketmq/store
        - ./broker-a/conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf
      command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf
      depends_on:
        -namesrv
    broker-b:
      container_name: rmqbroker-b
      image: foxiswho/rocketmq:4.8.0
      ports:
        - '10919:10919'
        - '10921:10921'
      volumes:
        - ./broker-b/logs:/home/rocketmq/logs
        - ./broker-b/store:/home/rocketmq/store
        - ./broker-b/conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf
      environment:
        JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128M -Xmx128M -Xmn128m"
      command: [ "sh","mqbroker","-c","/home/rocketmq/rocketmq-4.8.0/conf/broker.conf","-n", "namesrv:9876","autoCreateTopicEnable=true" ]
      depends_on:
        -namesrv
    mqconsole:
      image: styletang/rocketmq-console-ng
      container_name: rmqconsole
      ports:
        - "10100:8080"
      environment:
        JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false
      depends_on:
        -namesrv

Related broker.conf

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


#The name of the group to which it belongs
brokerClusterName=DefaultCluster

#broker name, note that different configuration files are filled in differently here, if you use in broker-a.properties: broker-a,
#Use in broker-b.properties: broker-b
brokerName=broker-a

#0 means Master, >0 means Slave
brokerId=0

#nameServer address, separated by semicolon
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#Start IP, if docker reports com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# Solution 1 Add a sentence producer.setVipChannelEnabled(false);, Solution 2 brokerIP1 Set the host IP, do not use docker internal IP

# Set the ip address of the server where the broker node is located (** this is very important, in the master-slave mode, the slave node will synchronize data according to the brokerIP2 of the master node, if not configured, the master-slave cannot be synchronized, brokerIP1 is set to be accessible from the external network The ip must be configured in the case of dual network cards in the server, such as Alibaba Cloud, the master node needs to configure ip1 and ip2, and the slave node only needs to configure ip1)
brokerIP1=172.16.100.81
brokerIP2 = 172.16.100.81

#When sending a message, automatically create a topic that does not exist in the server, and the number of queues created by default
defaultTopicQueueNums=4

#Whether to allow Broker to automatically create Topic, it is recommended to enable it offline and disable it online! ! ! Look carefully here is false, false, false
#reason See you in the next blog~ Hahahaha
autoCreateTopicEnable=true

#Whether to allow Broker to automatically create subscription groups, it is recommended to open offline and close online
autoCreateSubscriptionGroup=true

#Broker listening port for external services
listenPort=10911

#Delete the file time point, the default is 4:00 am
deleteWhen=04

#File retention time, default 48 hours
fileReservedTime=12

#commitLog The default size of each file is 1G
mapedFileSizeCommitLog=1073741824

#ConsumeQueue stores 30W items by default for each file, adjust according to business conditions
mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#Detect physical file disk space
diskMaxUsedSpaceRatio=98
#Storage path
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog storage path
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#consumption queue storage
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#Message index storage path
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint file storage path
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort file storage path
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# limit message size
maxMessageSize=65536

#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

The role of #Broker
#- ASYNC_MASTER asynchronous replication Master
#- SYNC_MASTER Synchronous dual-write Master
#- SLAVE
brokerRole=ASYNC_MASTER

#Swipe mode
#- ASYNC_FLUSH asynchronous flushing
#- SYNC_FLUSH synchronous flushing
flushDiskType=ASYNC_FLUSH

#Number of message thread pools
#sendMessageThreadPoolNums=128
#Pull message thread pool number
#pullMessageThreadPoolNums=128
slaveReadEnable = true

2 To increase the complexity of the system
2.1 Ensure that messages are not consumed repeatedly
(1) Use redis to deduplicate
Producer codes send 100 codes

/**
 * send normal message
 */
public class SendMessage {<!-- -->
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {<!-- -->
        // Create a message producer, specify the group name to which the producer belongs
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        // Specify Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        // start the producer
        producer.start();
        MessageExt msg = new MessageExt();
        msg.setBody(("RocketMQ Message").getBytes());
        msg.setTopic("topicB");
        msg.setTags("myTag");
        msg.setMsgId(UUID.randomUUID().toString());
        // Send a message
// SendResult send = producer.send(msg, 10000);//synchronous
        for (int i = 0; i <100 ; i ++ ) {<!-- -->
            producer.send(msg, new SendCallback() {<!-- -->//Asynchronous callback function
                @Override
                public void onSuccess(SendResult sendResult) {<!-- -->
                    System.out.println("Send successfully");
                }
                @Override
                public void onException(Throwable throwable) {<!-- -->
                    System.out.println("Failed to send");
                }
            }, 10000);
        }

    }

consumer code

@Slf4j
@Configuration
public class QCCustomer {<!-- -->
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Value("${rocketmq.producer.group:groupB}")
    private String resultsGroupName;
    @Value("${rocketmq.name-server:}")
    private String namesrvAddr;
    @Autowired
    private MQConsumeMsgListenerProcessor mqConsumeMsgListenerProcessor;

    @Bean
    public DefaultMQPushConsumer defaultConsumer() {<!-- -->
        log.info("ruleEngineConsumer is creating ------------------------------------------" );
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(2);
        consumer.setConsumeMessageBatchMaxSize(100);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // set monitor
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {<!-- -->
            try {<!-- -->
                MessageExt messageExt = list. get(0);
                String msgId = messageExt. getMsgId();
                Long increment = redisTemplate.opsForValue().increment(msgId);
                redisTemplate.expire(msgId,2,TimeUnit.MINUTES);//
                if(increment==1) {<!-- -->
                    Thread. sleep(100);
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    String format = sdf. format(new Date());
                    log.info("Time {}, the message received by MQ is: {}", format, new String(messageExt.getBody(), "utf-8"));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setInstanceName(System.currentTimeMillis() + "dstMQMsgConsumer");
        consumer.setConsumerGroup(resultsGroupName);
        try {<!-- -->
            consumer. subscribe("topicB", "*");
            consumer.start();
            log.info("ruleEngineConsumer successfully created groupName={}", resultsGroupName);

        } catch (MQClientException e) {<!-- -->
            log.error("ruleEngineConsumer failed to create!");
        }
        return consumer;
    }

}

The result is as follows:

(2) Establish a unique index in the database to ensure that data will not be duplicated.
2 How to ensure the reliable transmission of messages
RocketMQ message loss may occur in the following three stages:
1. When the producer sends a message to the Broker;
2. Broker internally stores messages to disk and master-slave replication synchronization;
3. When the broker pushes the message to the consumer or the consumer actively pulls the message;
solution strategy
Message sender: through different retry strategies to ensure the reliable sending of messages;
(1), synchronous sending
Synchronous sending means that when sending a message, the sender blocks the thread and waits until the server returns the sent result. If the sending end needs to ensure the reliability of the message and prevent the failure of message sending, it can use synchronous blocking sending, and then synchronously check the status returned by Brocker to determine whether the message is persisted successfully. If the sending times out or fails, it will retry 2 times by default. RocketMQ chooses a message model that transmits successfully at least once, but repeated delivery may occur, so the consumer needs to be idempotent.

(2), asynchronous sending
Asynchronous sending means that when the sender sends a message, it passes in the implementation class of the callback interface. After calling the sending interface, it will not block, the sending method will return immediately, the callback task will be executed in another thread, and the message sending result will be sent back to the corresponding callback function. The specific business implementation can judge whether retry is needed to ensure the reliability of the message according to the sent result information.

(3), one-way sending
One-way sending means that after the sender completes sending, it returns immediately after calling the sending interface, and does not return the sending result. The business party cannot judge whether the message is sent successfully according to the sending status. One-way sending is relatively faster than the previous two sending methods. It is an unreliable way to send messages, so to ensure the reliability of message sending, this method is not recommended for sending messages.

(4), send retry strategy
After the producer fails to send a message, it will retry according to the corresponding strategy. Producer’s send method itself supports internal retry, and the retry logic is as follows:

1. Retry at most 2 times.

2. If the synchronous mode fails to send, it will turn to the next Broker. If the asynchronous mode fails to send, it will only retry on the current Broker. The total time-consuming time of this method does not exceed the value set by sendMsgTimeout, which is 10s by default.

3. If a timeout exception occurs when sending a message to the broker, it will not retry.

The above strategy also guarantees that the message can be sent successfully to a certain extent. If the business has high requirements on message reliability, it is recommended that the application add corresponding retry logic: for example, when calling the send synchronization method to fail to send, try to store the message in db, and then retry periodically by the background thread to ensure that the message must reach the Broker.
Broker server: ensure reliable storage of messages through different flashing mechanisms and master-slave replication;
(1) Disk brushing mechanism
Synchronous flushing: After the message is written to the PageCache of the memory, the thread for flushing the disk is immediately notified to flush the disk, and then waits for the completion of the disk flushing. After the execution of the flushing thread is completed, the waiting thread is awakened, and the status of writing the message is returned successfully. This method can ensure absolute data security, but the throughput is not large.
Asynchronous disk flushing (default): When the message is written to the PageCache in the memory, it will immediately return to the client that the write operation is successful. The message is written to disk. This method has high throughput and high performance, but the data in PageCache may be lost, and absolute data security cannot be guaranteed.
Use synchronous brushing to ensure absolute data security
Message consumer: ensure reliable consumption of messages through at least one successful consumption and consumption retry mechanism
(2) Replication synchronization
Synchronous replication: The synchronous replication method is to wait for the Master and Slave to write successfully before feeding back the successful writing status to the client. In the synchronous replication mode, if the Master fails, all the backup data on the Slave is easy to restore, but synchronous replication will increase the data writing delay and reduce the system throughput.
Asynchronous replication: The asynchronous replication method is that as long as the master writes successfully, it can feedback the write success status to the client. In the asynchronous replication mode, the system has lower latency and higher throughput, but if the Master fails, some data may be lost because it has not been written to the Slave;
2 How to send sequential messages
Rockermq only supports local order. To achieve global order, it is necessary to ensure that the producer and topic queue consumer are in a one-to-one relationship.
producer

public class SortProducter {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[]{<!-- -->"TagA", "TagC", "TagD"};
        // Order List
        List<OrderStep> orderList = new SortProducter().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf. format(date);
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            // add a time prefix
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("TopicTest", tags[i % tags. length], "KEY" + i, body. getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {<!-- -->
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {<!-- -->
                    Long id = (Long) arg; //Choose the queue to send according to the order id
                    long index = id % mqs. size();
                    return mqs. get((int) index);
                }
            }, orderList.get(i).getOrderId());//order id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult. getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer. shutdown();
    }

    /**
     * Order steps
     */
    private static class OrderStep {<!-- -->
        private long orderId;
        private String desc;

        public long getOrderId() {<!-- -->
            return orderId;
        }

        public void setOrderId(long orderId) {<!-- -->
            this. orderId = orderId;
        }

        public String getDesc() {<!-- -->
            return desc;
        }

        public void setDesc(String desc) {<!-- -->
            this.desc = desc;
        }

        @Override
        public String toString() {<!-- -->
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * Generate simulated order data
     */
    private List<OrderStep> buildOrders() {<!-- -->
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Push");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        return orderList;
    }
}

consumer fulfillment

public class SortCustomer {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * Set whether the Consumer starts to consume from the head of the queue or the tail of the queue when it starts for the first time<br>
         * If it is not the first time to start, then continue to consume according to the position of the last consumption
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {<!-- -->
            Random random = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {<!-- -->
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {<!-- -->
                    // You can see that each queue has a unique consume thread to consume, and the order is ordered for each queue (partition)
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody ()));
                }
                try {<!-- -->
                    //Simulating business logic processing...
                    TimeUnit. SECONDS. sleep(random. nextInt(10));
                } catch (Exception e) {<!-- -->
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus. SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

3 Consistency issues
For consistency problems, transaction messages can be used to solve