Protocol analysis of handwritten Redis (6) in go

RESP protocol analysis

For specific protocol descriptions, please refer to the protocol description in the first article. Next, we will start the analysis of the protocol directly.

Reply

Define a top-level reply entity interface in interface/resp/reply.go, and all subsequent protocol classes will implement this interface

// Reply is used to reply data
type Reply interface {<!-- -->

//ToBytes converts data to Byte array
ToBytes() []byte
}

1. Generic response

There are many common processing protocols in redis, such as: errors, normal responses, integers, arrays and other commonly used entities, we can directly define them in advance, and then we can directly resolve errors when parsing the protocol use

reply.go

Create a reply file in resp/reply/reply.go. In the current go file, we define protocol-related entity structures, such as: IntReply, StatusReply and other structures, all of which implement the interface The ToBytes() method of Reply

var (
nullBulkReplyBytes = []byte("$-1")
CRLF = "\r\\
" //end symbol
)

// BulkReply The reply body of a single string
type BulkReply struct {<!-- -->
//Response data, if you want to reply a "hello world" , $11\r\\
hello world\r\\

Arg []byte
}

// ToBytes custom entity
func (b *BulkReply) ToBytes() []byte {<!-- -->
if len(b.Arg) == 0 {<!-- -->
return nullBulkReplyBytes
}
return []byte(fmt.Sprintf("$%d%s%s%s", len(b.Arg), CRLF, string(b.Arg), CRLF))
}

// MakeBulkReply creates an entity
func MakeBulkReply(arg []byte) *BulkReply {<!-- -->
return &BulkReply{<!-- -->
Arg: arg,
}
}

// MultiBulkReply reply of multiple strings
type MultiBulkReply struct {<!-- -->
Args [][]byte
}

func (m *MultiBulkReply) ToBytes() []byte {<!-- -->
argLen := len(m. Args)
var buf bytes. Buffer
buf.WriteString(fmt.Sprintf("*%d%s", argLen, CRLF))
for _, arg := range m.Args {<!-- -->
if arg == nil {<!-- -->
buf.WriteString(string(nullBulkReplyBytes) + CRLF)
}
buf.WriteString(fmt.Sprintf("$%d%s%s%s", len(arg), CRLF, string(arg), CRLF))
}
return buf. Bytes()
}

// MakeMultiBulkReply creates an entity
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {<!-- -->
return &MultiBulkReply{<!-- -->
Args: args,
}
}

// StatusReply status reply
type StatusReply struct {<!-- -->
Status string
}

func (s *StatusReply) ToBytes() []byte {<!-- -->
return []byte(" + " + s.Status + CRLF)
}

func MakeStatusReply(status string) *StatusReply {<!-- -->
return &StatusReply{<!-- -->
Status: status,
}
}

// IntReply numeric type reply
type IntReply struct {<!-- -->
Code int64
}

func (i *IntReply) ToBytes() []byte {<!-- -->
return []byte(":" + strconv.FormatInt(i.Code, 10) + CRLF)
}

func MakeIntReply(code int64) *IntReply {<!-- -->
return &IntReply{<!-- -->Code: code}
}

// StandardErrReply standard error reply
type StandardErrReply struct {<!-- -->
Status string
}

func (s *StandardErrReply) ToBytes() []byte {<!-- -->
return []byte("-" + s.Status + CRLF)
}

func MakeStandardErrReply(status string) *StandardErrReply {<!-- -->
return &StandardErrReply{<!-- -->Status: status}
}

// IsErrReply judges whether the reply data is abnormal information
func IsErrReply(reply resp. Reply) bool {<!-- -->
return reply.ToBytes()[0] == '-'
}

// ErrorReply error reply, implements two interfaces
type ErrorReply interface {<!-- -->
//Error error message
Error() string
//ToBytes convert byte array
ToBytes() []byte
}

error.go

Under the same directory, we create an interface body definition file for error reply, which is used to define the general error interface body

/**
General exception information
1. UnknownErrReply: unknown error
2. ArgNumErrReply: wrong number of arguments
3. SyntaxErrReply: syntax error
4. WrongTypeErrReply: wrong type
5. ProtocolErrReply: protocol error, does not conform to the resp protocol
*/

// UnknownErrReply unknown error
type UnknownErrReply struct{<!-- -->}

var unKnownErrReply = []byte("-Err unknown\r\\
")

func (u *UnknownErrReply) Error() string {<!-- -->
return "Err unknown"
}

func (u *UnknownErrReply) ToBytes() []byte {<!-- -->
return unKnownErrReply
}

func MakeUnknownErrReply() *UnknownErrReply {<!-- -->
return & UnknownErrReply{<!-- -->}
}

// ArgNumErrReply The number of parameters is wrong
type ArgNumErrReply struct {<!-- -->
//Cmd command itself
Cmd string
}

func (a *ArgNumErrReply) Error() string {<!-- -->
return fmt.Sprintf("Err wrong number of arguments for : %s, command", a.Cmd)
}

func (a *ArgNumErrReply) ToBytes() []byte {<!-- -->
return []byte(fmt.Sprintf("-Err wrong number of arguments for : %s, command\r\\
", a.Cmd))
}

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {<!-- -->
return & amp;ArgNumErrReply{<!-- -->Cmd: cmd}
}

// SyntaxErrReply syntax error
type SyntaxErrReply struct{<!-- -->}

var syntaxErrBytes = []byte("-Err syntax error\r\\
")
var theSyntaxErrReply = new(SyntaxErrReply)

func (s *SyntaxErrReply) Error() string {<!-- -->
return "Err syntax error"
}

func (s *SyntaxErrReply) ToBytes() []byte {<!-- -->
return syntaxErrBytes
}

func MakeSyntaxErrReply() *SyntaxErrReply {<!-- -->
return &SyntaxErrReply{<!-- -->}
}

// WrongTypeErrReply syntax error
type WrongTypeErrReply struct{<!-- -->}

var wrongTypeErrBytes = []byte("-wrong type operation against a key holding the wrong kind of value\r\\
")
var theWrongTypeErrReply = new(WrongTypeErrReply)

func (w *WrongTypeErrReply) Error() string {<!-- -->
return "-Err wrong type"
}

func (w *WrongTypeErrReply) ToBytes() []byte {<!-- -->
return wrongTypeErrBytes
}

func MakeWrongTypeErrReply() *WrongTypeErrReply {<!-- -->
return &WrongTypeErrReply{<!-- -->}
}

// ProtocolErrReply protocol error
type ProtocolErrReply struct {<!-- -->
Msg string
}

func (p *ProtocolErrReply) Error() string {<!-- -->
return fmt.Sprintf("Error protocol error: %s\r\\
", p.Msg)
}

func (p *ProtocolErrReply) ToBytes() []byte {<!-- -->
return []byte(fmt.Sprintf("-Error protocol error: %s\r\\
", p.Msg))
}

consts.go

This file mainly defines some fixed reply formats

/**
Used to save some fixed reply message formats
1. PongReply: heartbeat reply
2. OkReply: ok reply
3. NullBulkReply: empty string reply
4. EmptyMultiBulkReply: empty array reply
5. NoReply: no data
*/

// PongReply heartbeat reply
type PongReply struct{<!-- -->}

// pongBytes creates a constant byte array
var pongBytes = []byte(" + PONG\r\\
")

// thePongReply directly creates a constant instead of creating a new object every time
var thePongReply = new(PongReply)

type OkReply struct{<!-- -->}

var okBytes = []byte(" + OK\r\\
")
var theOkReply = new(OkReply)

// NullBulkReply The response of an empty string -1 means empty
type NullBulkReply struct{<!-- -->}

var nullBulkBytes = []byte("$-1\r\\
")
var theNullBulkReply = new(NullBulkReply)

// EmptyMultiBulkReply empty array reply
type EmptyMultiBulkReply struct{<!-- -->}

var emptyMultiBulkBytes = []byte("*0\r\\
")
var theEmptyMultiBulkReply = new(EmptyMultiBulkReply)

// NoReply empty reply
type NoReply struct{<!-- -->}

var noReplyBytes = []byte("")
var theNoReply = new(NoReply)

// ToBytes directly reply fixed structure data
func (p *PongReply) ToBytes() []byte {<!-- -->
return pongBytes
}

// MakePongReply creates a reply, generally exposing a make method
func MakePongReply() *PongReply {<!-- -->
return the PongReply
}

func (o *OkReply) ToBytes() []byte {<!-- -->
return okBytes
}

func MakeOkReply() *OkReply {<!-- -->
return the OkReply
}

func (n *NullBulkReply) ToBytes() []byte {<!-- -->
return nullBulkBytes
}

func MakeNullBulkReply() *NullBulkReply {<!-- -->
return theNullBulkReply
}

func (e *EmptyMultiBulkReply) ToBytes() []byte {<!-- -->
return emptyMultiBulkBytes
}

func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {<!-- -->
return theEmptyMultiBulkReply
}

func (n *NoReply) ToBytes() []byte {<!-- -->
return noReplyBytes
}

func MakeNoReply() *NoReply {<!-- -->
return theNoReply
}

2. Parser

2.1 Structure

Payload

The Payload structure mainly saves the storage after we have parsed the protocol data, which contains resp.Reply and error error messages, Data is the response entity, which is all the structure objects defined above

readState

The status of the parser contains the description of the current parsing protocol parameters

Field Description
  • readingMultiline: Whether the currently parsed data is multi-line data or single-line data
  • expectedArgsCount: Record the number of parameters of the parsing command. If it is an array, then it will parse multiple. If it is a single line parsing, it will be 1
  • msgType: The type of packet. For example: *, +, $ and other types of data packets, refer to the description of the resp protocol
  • args: the passed parameter itself, two-dimensional data for data storage, for example: set k v, there are three groups
  • bulkLen: Parse the byte length of the data that needs to be parsed later
Method Description
  • finished: Whether the current parser has finished parsing, mainly to judge whether the length of args is the same as expectedArgsCount
  • result: Save the result according to the type of the command and return it, and the response is resp.Reply . Note that our analysis of the protocol is the resp.Reply type of operation
  • ParseStream: This is a public method for external calls to create a channel (chan) and return it to the outside. After the parsing is completed, it will be passed into the pipeline
  • parser0: parsing method, accepting two types of parameters io.Reader and *Payload, io.Reader was mentioned in the implementation of the previous tcp server , the client’s connection will also implement this interface
  • readLine: read one line at a time, ending with \\
  • adjustType: According to the type of protocol, the corresponding processing function will be returned
  • readBody: used to read the real data following $

The above is the general description. The actual analysis is performed normally according to the RESP protocol. You can analyze it by yourself. The following is the code

/**
protocol parser
*/

// Payload data entity
type Payload struct {<!-- -->
//The data that the client replies to the server, why use the Reply format, because for the server, it is also the data that the client replies to the server
Data resp. Reply
//Is there an error
Err error
}

// readState parser state
type readState struct {<!-- -->
//Analyze single-line or multi-line data
readingMultiline bool
//Record the number of parameters of the parsing command, if it is an array, then it will be multiple, if it is a single line, it will be 1
expectedArgsCount int
//Type of packet. For example: *, +, $ and other types of data packets, refer to the description of the resp protocol
msgType byte
//The passed parameter itself, two-dimensional data for data storage, for example: set k v, there are three groups
args[][]byte
//Analysis of the byte length of the subsequent data that needs to be parsed
bulkLen int64
}

// finished Whether the current parser has finished parsing
func (r *readState) finished() bool {<!-- -->
//The parsed number is the same as the number of parameters
return r.expectedArgsCount > 0 & amp; & amp; len(r.args) == r.expectedArgsCount
}

// result saves the result according to the type of the instruction and returns it
func (r *readState) result() resp.Reply {<!-- -->
switch r.msgType {<!-- -->
case common. ASTERISK:
return reply. MakeMultiBulkReply(r. args)
case common. DOLLAR:
return reply. MakeBulkReply(r. args[0])
default:
return nil
}
}

// parser0 parses the data and the data sent by tcp is io.Reader
func parser0(reader io. Reader, ch chan *Payload) {<!-- -->
//Even if there is an error, it cannot jump out of the infinite loop
defer func() {<!-- -->
if err := recover(); err != nil {<!-- -->
logger. Error(string(debug. Stack()))
}
}()
bufReader := bufio. NewReader(reader)
state := &readState{<!-- -->}
var err error
var msg []byte
//Infinite loop read data continuously
for true {<!-- -->
var ioErr bool
/**
msg: Returns a line of data that read \\
 is divided, ioErr: indicates whether an exception is read, err: the returned exception
For example: *3\r\\
$3\r\\
SET\r\\
$3\r\\
key\r\\
$5\r\\
value\r\\
 data, then the read is *3\r\\

*/
msg, ioErr, err = readLine(bufReader, state)
if err != nil {<!-- --> //Read a line of data first to determine whether it is an io error
if ioErr {<!-- -->
//Write error data to the pipeline
ch <- & amp;Payload{<!-- -->
Err: err,
}
close(ch)
return
}
// common error
ch <- & amp;Payload{<!-- -->
Err: err,
}
state = &readState{<!-- -->}
continue
}
/**
Judging whether it is multi-line parsing, the default is false for the first time it comes in, and its state will be changed after it is read
For example: *3\r\\
$3\r\\
SET\r\\
$3\r\\
key\r\\
$5\r\\
value\r\\

Parsing a line for the first time: *3\r\\
 Change the status readingMultiline to multi-line status, parsing 3 means the array has 3 parameters
The second cycle parses a line: $3\r\\
 means that there is a string behind it, and the state of readingMultiline is changed for the first time, so go to the else to read through readBody(), and modify state.bulkLen to explain the follow-up The parameter byte length
The third cycle parses a line: SET\r\\
 parses the parameters, parses through readBody(), and stores the data directly into the state.args[][] array
The fourth cycle parses a line: $3\r\\
 Continue to parse the length of subsequent parameters
The fifth cycle parses a line: KEY\r\\
 The parameters read are stored in the array
The sixth cycle parses a line: $5\r\\
 Continue to parse the length of subsequent parameters is 5
The seventh cycle parses a line: VALUE\r\\
 The read parameters are stored in the array
*/
if !state.readingMultiline {<!-- -->
//Judge what type the first symbol of the data is
first := msg[0]
//Use the adjustType() function to determine what type of data is, and then call the returned function
payload := adjustType(first)(msg, state, ch)
//Return the entity of the response to the pipeline
if payload != nil {<!-- -->
ch <- payload
}
} else {<!-- -->
// read data
err := readBody(msg, state)
if err != nil {<!-- -->
//If err is not empty, return a protocol error
ch <- & amp;Payload{<!-- -->
Err: errors. New("protocol error:" + string(msg)),
}
//reset state controller
state = &readState{<!-- -->}
continue
}
/ / Determine whether the read is complete
if state. finished() {<!-- -->
ch <- & amp;Payload{<!-- -->
//Wrap the Resp object
Data: state.result(),
}
state = &readState{<!-- -->}
}
}
}
}

//adjustType determines the type and returns the corresponding type processing function
func adjustType(first byte) func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- -->
//Return a default processing function, print the error message
resultFunc := func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- -->
logger.Error("The current type: %s packet processing is not supported", first)
return nil
}
//First judge the * number
if first == common. ASTERISK {<!-- -->
//protocol parsing error
resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- -->
// * number parses multiple rows of data
err := parseMultiBulkHeader(msg, state)
// Handle read errors
if err != nil {<!-- -->
state = &readState{<!-- -->}
return & amp;Payload{<!-- -->
Err: errors. New("protocol error:" + string(msg)),
}
}
//If the parsed array length is empty
if state.expectedArgsCount <= 0 {<!-- -->
state = &readState{<!-- -->}
return & amp;Payload{<!-- -->
//Respond to the core of redis with an empty array instead of returning to the client
Data: reply. MakeEmptyMultiBulkReply(),
}
}
return nil
}
} else if first == common.DOLLAR {<!-- --> // parse $
resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- -->
err := parseBulkHeader(msg, state)
// Handle read errors
if err != nil {<!-- -->
state = &readState{<!-- -->}
return & amp;Payload{<!-- -->
Err: errors. New("protocol error:" + string(msg)),
}
}
//Response of empty string -1 means empty
if state.bulkLen == -1 {<!-- -->
state = &readState{<!-- -->}
return & amp;Payload{<!-- -->
//Respond to the core of redis with an empty array instead of returning to the client
Data: reply. MakeNullBulkReply(),
}
}
return nil
}
} else {<!-- --> //Analysis + or - sign
resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- -->
//Parse a single line
result, err := parseSingleLine(msg)
//Clear the state register
state = &readState{<!-- -->}
return & amp;Payload{<!-- -->
Data: result,
Err: err,
}
}
}
return resultFunc
}

// ParseStream will return a pipeline when parsing the byte stream, and send data to the pipeline after the parsing is complete
func ParseStream(reader io. Reader) chan *Payload {<!-- -->
//create a pipeline
ch := make(chan *Payload)
go parser0(reader, ch)
//Respond to a ch pipeline for the redis core, the core layer only needs to monitor the pipeline data
return ch
}

// readLine reads a line of data according to the instructions. For example: *3\r\\
$3\r\\
SET\r\\
$3\r\\
key\r\\
$5\r\\
value\r\\

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {<!-- -->
/**
1. Segment according to the normal situation of \r\\
 (the $ character is not read), because $ is a preset command, indicating how many characters need to be read later
2. If the $ character is read before, it will be read strictly according to the number of characters following the $, and no branching can be performed
*/
var msg []byte
var err error
if state.bulkLen == 0 {<!-- --> // 1. Indicates that no pre-set instructions such as $ were read before, segmented according to \r\\

msg, err = bufReader. ReadBytes('\\
')
if err != nil {<!-- -->
return nil, true, err
}
// After judging whether \\
 is cut, whether the penultimate is \r to distinguish, the protocol format is wrong
if len(msg) == 0 || msg[len(msg)-2] != '\r' {<!-- -->
return nil, false, errors. New("protocol error: " + string(msg))
}
} else {<!-- -->
//2. Preset characters such as $ are read in front, state.bulkLen + 2 needs to read the following \r\\

msg = make([]byte, state. bulkLen + 2)
//Read all the lengths of the array in bufReader
_, err := io. ReadFull(bufReader, msg)
if err != nil {<!-- -->
return nil, true, err
}
//Judge whether it is \r\\
 to distinguish, that is, the protocol format is wrong
if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\\
' {< !-- -->
return nil, false, errors. New("protocol error: " + string(msg))
}
//Set the preset length to 0
state. bulkLen = 0
}
return msg, false, nil
}

// parseMultiBulkHeader parses multiple lines, for example: *3\r\\
$3\r\\
SET\r\\
$3\r\\
key\r\\
$5\r\\
value \r\\

func parseMultiBulkHeader(msg []byte, state *readState) error {<!-- -->
var err error
var expectedLine uint64
//Cut out the 3 in *3\r\\
 to get the length of the current packet
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {<!-- -->
return errors. New("protocol error: " + string(msg))
}
if expectedLine == 0 {<!-- --> // read empty data
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {<!-- --> //Indicates the length of subsequent data
//Analyzed flag bit, type of message
state.msgType = msg[0]
// now reading multiline status
state.readingMultiline = true
//Set the number of subsequent parameters
state. expectedArgsCount = int(expectedLine)
/ / Initialize the length of the array, two-dimensional array for storage
state.args = make([][]byte, 0, expectedLine)
return nil
} else {<!-- -->
return errors. New("protocol error: " + string(msg))
}
}

// parseBulkHeader parses a single line, for example: $4\r\\

func parseBulkHeader(msg []byte, state *readState) error {<!-- -->
var err error
//Analyze the data length of a single line, for example: $4\r\\
, here it is 4
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {<!-- -->
return errors. New("protocol error: " + string(msg))
}
if state.bulkLen == -1 {<!-- -->
return nil
} else if state.bulkLen > 0 {<!-- -->
//Analyzed flag bit, type of message
state.msgType = msg[0]
// now reading multiline status
state.readingMultiline = false
//The number of lines to read parameters is 1 line
state.expectedArgsCount = 1
//Initialize the length of the array, if there is a single line, then the subsequent parameter is 1
state.args = make([][]byte, 0, 1)
return nil
} else {<!-- -->
return errors. New("protocol error: " + string(msg))
}
}

// Parse the + OK, -err, :5\r\\
 format of the client response, which can be parsed directly
func parseSingleLine(msg []byte) (resp.Reply, error) {<!-- -->
// parse the text
str := strings.TrimSuffix(string(msg), "\r\\
")
var result resp.Reply
switch msg[0] {<!-- -->
case common.PLUS: // parse the correct response package
result = reply. MakeStatusReply(str[1:])
case common.DASH: //parse error response packet
result = reply. MakeStandardErrReply(str[1:])
case common.COLON: //parsing integer packets
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {<!-- -->
return nil, errors. New("protocol error: " + string(msg))
}
result = reply. MakeIntReply(val)
}
return result, nil
}

/**
There are two possible situations:
1. $3\r\\

2. SET\r\\

*/
func readBody(msg []byte, state *readState) error {<!-- -->
//This is to intercept the subsequent \r\\
 delimiter
line := msg[0 : len(msg)-2]
var err error
// $3 parses the 3 behind $ and comes out
if line[0] == common. DOLLAR {<!-- -->
//Save the byte length that needs to be parsed later into the state device
state. bulkLen, err = strconv. ParseInt(string(line[1:]), 10, 64)
if err != nil {<!-- -->
return errors. New("protocol error: " + string(msg))
}
//if case type is $0\r\\

if state.bulkLen <= 0 {<!-- -->
//empty length
state.args = append(state.args, []byte{<!-- -->})
state. bulkLen = 0
}
} else {<!-- -->
//Go here and the data is SET
state.args = append(state.args, line)
}
return nil
}