Golang integrates RocketMQ

RocketMQ related knowledge summary

What is RocketMQ

RocketMQ is an open source MQ framework from Alibaba. It is widely used in different business scenarios and has very good ecosystem support. It supports transaction messages, sequential messages, batch messages, scheduled messages, message traceback and other functions.

RocketMQ core concepts
  • Name Service (NameServer): It can be understood as a registration center. It is mainly used to save topic routing messages and manage Brokers. In a NameServer cluster, NameServers do not have any communication with each other.

  • Proxy server (BrokerServer): Message relay role, responsible for storing and forwarding messages. The proxy server in the RocketMQ system is responsible for receiving messages sent from producers, storing them, and preparing for pull requests from consumers. The proxy server also stores message-related metadata, including consumer groups, consumption progress offsets, and topic and queue messages.

  • Producer: Responsible for producing messages. Generally, the business system is responsible for producing messages. A message producer will send messages generated in the business application system to the broker server. RocketMQ provides multiple sending methods, including synchronous sending, asynchronous sending, sequential sending, and one-way sending. Both synchronous and asynchronous methods require the Broker to return confirmation information, but one-way sending does not.

  • Producer Group: A collection of Producers of the same type, which send the same type of messages and have consistent sending logic. If a transactional message is sent and the original producer crashes after sending it, the Broker server contacts other producer instances of the same producer group to commit or backtrack consumption.

  • Consumer: Responsible for consuming messages, usually the background system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user applications, two forms of consumption are provided: pull consumption and push consumption.

  • Consumer Group: A collection of consumers of the same type. This type of Consumer usually consumes the same type of messages and has consistent consumption logic. Consumer groups make it very easy to achieve load balancing and fault tolerance when it comes to message consumption. It should be noted that the consumer instances of the consumer group must be subscribed to exactly the same Topic. RocketMQ supports two message modes: Clustering and Broadcasting.

  • Topic: Represents a collection of messages. Each topic contains several messages. Each message can only belong to one topic. It is the basic unit of RocketMQ for message subscription.

  • Tag: A flag set for a message, used to distinguish different types of messages on the same topic. Messages from the same business unit can have different labels under the same topic based on different business purposes. Tags can effectively maintain code clarity and coherence, and optimize the query system provided by RocketMQ. Consumers can implement different consumption logic for different sub-topics based on Tags to achieve better scalability.

Extended concepts

  • Message Model: RocketMQ is mainly composed of three parts: Producer, Broker, and Consumer. Producer is responsible for producing messages, Consumer is responsible for consuming messages, and Broker is responsible for storing messages. Broker corresponds to one server during the actual deployment process. Each Broker can store messages from multiple Topics, and messages from each Topic can also be fragmented and stored in different Brokers. MessageQueue is used to store the physical address of the message. The message address in each Topic is stored in multiple MessageQueue. ConsumerGroup consists of multiple Consumer instances.

  • Message: The physical carrier of information transmitted by the message system, the smallest unit of production and consumption of data, each message must belong to a topic. Each message in RocketMQ has a unique Message ID and can carry a Key with business identification. The system provides the function of querying messages through Message ID and Key.

  • Pull Consumer: A type of Consumer consumption. The application usually actively calls the Consumer’s pull message method to pull messages from the Broker server. The initiative is controlled by the application. Once the batch of messages is obtained, the application starts the consumption process.

  • Push Consumer: A type of Consumer consumption. In this mode, the Broker will actively push the data to the consumer after receiving it. This consumption mode generally has high real-time performance.

  • Clustering: In cluster consumption mode, each Consumer instance of the same Consumer Group shares messages evenly.

  • Broadcasting: In broadcast consumption mode, each Consumer instance of the same Consumer Group receives the full amount of messages.

  • Normal Ordered Message: In the normal sequential consumption mode, the messages received by consumers through the same message queue (Topic partition, called Message Queue) are in order, and the messages received by different message queues may be Unsequential.

  • Strictly Ordered Message: In the strictly ordered message mode, all messages received by consumers are in order.

RocketMQ setup

Now we need to build a rokcetMQ development environment locally. The way we build it is based on docker-compose technology. The content of the docker-compose.yaml file is as follows:

version: '3'
services:
  rmqnamesrv:
    image: rocketmqinc/rocketmq
    container_name: rmqnamesrv
    command: sh mqnamesrv
    ports:
      - "9876:9876"
    volumes:
      - ./namesrv/logs:/root/logs

  rmqbroker:
    image: rocketmqinc/rocketmq
    container_name: rmqbroker
    command: sh mqbroker -c /opt/rocketmq-4.9.1/conf/broker.conf
    depends_on:
      -rmqnamesrv
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    ports:
      - "10909:10909"
      - "10911:10911"
    volumes:
      - ./broker/conf:/opt/rocketmq-4.9.1/conf
      - ./broker/logs:/opt/rocketmq-4.9.1/logs

After creating the above content, you also need to create a rocketmq configuration file broker.conf file. The mapping path of the file: ./broker/conf. Relative to the path of the configuration file, just modify the object address of BrokeIP1. The content in the file as follows:

brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.18.135

After the build is completed, just use the docker-compose up -d command to start it.

Producer and consumer cases
  • producer
package main

import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
)

func main() {<!-- -->
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{<!-- -->"127.0.0.1:9876"})), // Access address
producer.WithRetry(2), //Number of retries
producer.WithGroupName("test"), // Group name
)
err := p.Start()
if err != nil {<!-- -->
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
tags := []string{<!-- -->"TagA", "TagB", "TagC"}
for i := 0; i < 3; i + + {<!-- -->
tag := tags[i%3]
msg := primitive.NewMessage("test",
[]byte("Hello RocketMQ Go Client!"))
msg.WithTag(tag)

res, err := p.SendSync(context.Background(), msg)
if err != nil {<!-- -->
fmt.Printf("send message error: %s\\
", err)
} else {<!-- -->
fmt.Printf("send message success: result=%s\\
", res.String())
}
}
err = p.Shutdown()
if err != nil {<!-- -->
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
  • consumer
package main

import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {<!-- -->
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("test"),
consumer.WithNameServer([]string{<!-- -->"127.0.0.1:9876"}),
)
if err != nil {<!-- -->
panic(err)
}
err = c.Subscribe("test", consumer.MessageSelector{<!-- -->}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {<!-- -->
for _, msg := range msgs {<!-- -->
fmt.Printf("subscribe callback: % + v \\
", msg)
}
return consumer.ConsumeSuccess, nil
})
if err != nil {<!-- -->
panic(err)
}
err = c.Start()
if err != nil {<!-- -->
panic(err)
}
defer func() {<!-- -->
err = c.Shutdown()
if err != nil {<!-- -->

fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}()
<-(chan interface{<!-- -->})(nil)

}
Reference materials
  1. https://zhuanlan.zhihu.com/p/528956421

  2. https://mp.weixin.qq.com/s/iRCP6hEiKOLEp8QRm_OsWQ