RESP protocol analysis
For specific protocol descriptions, please refer to the protocol description in the first article. Next, we will start the analysis of the protocol directly.
Reply
Define a top-level reply entity interface in interface/resp/reply.go, and all subsequent protocol classes will implement this interface
// Reply is used to reply data type Reply interface {<!-- --> //ToBytes converts data to Byte array ToBytes() []byte }
1. Generic response
There are many common processing protocols in redis, such as: errors, normal responses, integers, arrays and other commonly used entities, we can directly define them in advance, and then we can directly resolve errors when parsing the protocol use
reply.go
Create a reply file in resp/reply/reply.go. In the current go file, we define protocol-related entity structures, such as: IntReply, StatusReply and other structures, all of which implement the interface The ToBytes() method of Reply
var ( nullBulkReplyBytes = []byte("$-1") CRLF = "\r\\ " //end symbol ) // BulkReply The reply body of a single string type BulkReply struct {<!-- --> //Response data, if you want to reply a "hello world" , $11\r\\ hello world\r\\ Arg []byte } // ToBytes custom entity func (b *BulkReply) ToBytes() []byte {<!-- --> if len(b.Arg) == 0 {<!-- --> return nullBulkReplyBytes } return []byte(fmt.Sprintf("$%d%s%s%s", len(b.Arg), CRLF, string(b.Arg), CRLF)) } // MakeBulkReply creates an entity func MakeBulkReply(arg []byte) *BulkReply {<!-- --> return &BulkReply{<!-- --> Arg: arg, } } // MultiBulkReply reply of multiple strings type MultiBulkReply struct {<!-- --> Args [][]byte } func (m *MultiBulkReply) ToBytes() []byte {<!-- --> argLen := len(m. Args) var buf bytes. Buffer buf.WriteString(fmt.Sprintf("*%d%s", argLen, CRLF)) for _, arg := range m.Args {<!-- --> if arg == nil {<!-- --> buf.WriteString(string(nullBulkReplyBytes) + CRLF) } buf.WriteString(fmt.Sprintf("$%d%s%s%s", len(arg), CRLF, string(arg), CRLF)) } return buf. Bytes() } // MakeMultiBulkReply creates an entity func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {<!-- --> return &MultiBulkReply{<!-- --> Args: args, } } // StatusReply status reply type StatusReply struct {<!-- --> Status string } func (s *StatusReply) ToBytes() []byte {<!-- --> return []byte(" + " + s.Status + CRLF) } func MakeStatusReply(status string) *StatusReply {<!-- --> return &StatusReply{<!-- --> Status: status, } } // IntReply numeric type reply type IntReply struct {<!-- --> Code int64 } func (i *IntReply) ToBytes() []byte {<!-- --> return []byte(":" + strconv.FormatInt(i.Code, 10) + CRLF) } func MakeIntReply(code int64) *IntReply {<!-- --> return &IntReply{<!-- -->Code: code} } // StandardErrReply standard error reply type StandardErrReply struct {<!-- --> Status string } func (s *StandardErrReply) ToBytes() []byte {<!-- --> return []byte("-" + s.Status + CRLF) } func MakeStandardErrReply(status string) *StandardErrReply {<!-- --> return &StandardErrReply{<!-- -->Status: status} } // IsErrReply judges whether the reply data is abnormal information func IsErrReply(reply resp. Reply) bool {<!-- --> return reply.ToBytes()[0] == '-' } // ErrorReply error reply, implements two interfaces type ErrorReply interface {<!-- --> //Error error message Error() string //ToBytes convert byte array ToBytes() []byte }
error.go
Under the same directory, we create an interface body definition file for error reply, which is used to define the general error interface body
/** General exception information 1. UnknownErrReply: unknown error 2. ArgNumErrReply: wrong number of arguments 3. SyntaxErrReply: syntax error 4. WrongTypeErrReply: wrong type 5. ProtocolErrReply: protocol error, does not conform to the resp protocol */ // UnknownErrReply unknown error type UnknownErrReply struct{<!-- -->} var unKnownErrReply = []byte("-Err unknown\r\\ ") func (u *UnknownErrReply) Error() string {<!-- --> return "Err unknown" } func (u *UnknownErrReply) ToBytes() []byte {<!-- --> return unKnownErrReply } func MakeUnknownErrReply() *UnknownErrReply {<!-- --> return & UnknownErrReply{<!-- -->} } // ArgNumErrReply The number of parameters is wrong type ArgNumErrReply struct {<!-- --> //Cmd command itself Cmd string } func (a *ArgNumErrReply) Error() string {<!-- --> return fmt.Sprintf("Err wrong number of arguments for : %s, command", a.Cmd) } func (a *ArgNumErrReply) ToBytes() []byte {<!-- --> return []byte(fmt.Sprintf("-Err wrong number of arguments for : %s, command\r\\ ", a.Cmd)) } func MakeArgNumErrReply(cmd string) *ArgNumErrReply {<!-- --> return & amp;ArgNumErrReply{<!-- -->Cmd: cmd} } // SyntaxErrReply syntax error type SyntaxErrReply struct{<!-- -->} var syntaxErrBytes = []byte("-Err syntax error\r\\ ") var theSyntaxErrReply = new(SyntaxErrReply) func (s *SyntaxErrReply) Error() string {<!-- --> return "Err syntax error" } func (s *SyntaxErrReply) ToBytes() []byte {<!-- --> return syntaxErrBytes } func MakeSyntaxErrReply() *SyntaxErrReply {<!-- --> return &SyntaxErrReply{<!-- -->} } // WrongTypeErrReply syntax error type WrongTypeErrReply struct{<!-- -->} var wrongTypeErrBytes = []byte("-wrong type operation against a key holding the wrong kind of value\r\\ ") var theWrongTypeErrReply = new(WrongTypeErrReply) func (w *WrongTypeErrReply) Error() string {<!-- --> return "-Err wrong type" } func (w *WrongTypeErrReply) ToBytes() []byte {<!-- --> return wrongTypeErrBytes } func MakeWrongTypeErrReply() *WrongTypeErrReply {<!-- --> return &WrongTypeErrReply{<!-- -->} } // ProtocolErrReply protocol error type ProtocolErrReply struct {<!-- --> Msg string } func (p *ProtocolErrReply) Error() string {<!-- --> return fmt.Sprintf("Error protocol error: %s\r\\ ", p.Msg) } func (p *ProtocolErrReply) ToBytes() []byte {<!-- --> return []byte(fmt.Sprintf("-Error protocol error: %s\r\\ ", p.Msg)) }
consts.go
This file mainly defines some fixed reply formats
/** Used to save some fixed reply message formats 1. PongReply: heartbeat reply 2. OkReply: ok reply 3. NullBulkReply: empty string reply 4. EmptyMultiBulkReply: empty array reply 5. NoReply: no data */ // PongReply heartbeat reply type PongReply struct{<!-- -->} // pongBytes creates a constant byte array var pongBytes = []byte(" + PONG\r\\ ") // thePongReply directly creates a constant instead of creating a new object every time var thePongReply = new(PongReply) type OkReply struct{<!-- -->} var okBytes = []byte(" + OK\r\\ ") var theOkReply = new(OkReply) // NullBulkReply The response of an empty string -1 means empty type NullBulkReply struct{<!-- -->} var nullBulkBytes = []byte("$-1\r\\ ") var theNullBulkReply = new(NullBulkReply) // EmptyMultiBulkReply empty array reply type EmptyMultiBulkReply struct{<!-- -->} var emptyMultiBulkBytes = []byte("*0\r\\ ") var theEmptyMultiBulkReply = new(EmptyMultiBulkReply) // NoReply empty reply type NoReply struct{<!-- -->} var noReplyBytes = []byte("") var theNoReply = new(NoReply) // ToBytes directly reply fixed structure data func (p *PongReply) ToBytes() []byte {<!-- --> return pongBytes } // MakePongReply creates a reply, generally exposing a make method func MakePongReply() *PongReply {<!-- --> return the PongReply } func (o *OkReply) ToBytes() []byte {<!-- --> return okBytes } func MakeOkReply() *OkReply {<!-- --> return the OkReply } func (n *NullBulkReply) ToBytes() []byte {<!-- --> return nullBulkBytes } func MakeNullBulkReply() *NullBulkReply {<!-- --> return theNullBulkReply } func (e *EmptyMultiBulkReply) ToBytes() []byte {<!-- --> return emptyMultiBulkBytes } func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {<!-- --> return theEmptyMultiBulkReply } func (n *NoReply) ToBytes() []byte {<!-- --> return noReplyBytes } func MakeNoReply() *NoReply {<!-- --> return theNoReply }
2. Parser
2.1 Structure
Payload
The Payload structure mainly saves the storage after we have parsed the protocol data, which contains resp.Reply and error error messages, Data is the response entity, which is all the structure objects defined above
readState
The status of the parser contains the description of the current parsing protocol parameters
Field Description
- readingMultiline: Whether the currently parsed data is multi-line data or single-line data
- expectedArgsCount: Record the number of parameters of the parsing command. If it is an array, then it will parse multiple. If it is a single line parsing, it will be 1
- msgType: The type of packet. For example: *, +, $ and other types of data packets, refer to the description of the resp protocol
- args: the passed parameter itself, two-dimensional data for data storage, for example: set k v, there are three groups
- bulkLen: Parse the byte length of the data that needs to be parsed later
Method Description
- finished: Whether the current parser has finished parsing, mainly to judge whether the length of args is the same as expectedArgsCount
- result: Save the result according to the type of the command and return it, and the response is resp.Reply . Note that our analysis of the protocol is the resp.Reply type of operation
- ParseStream: This is a public method for external calls to create a channel (chan) and return it to the outside. After the parsing is completed, it will be passed into the pipeline
- parser0: parsing method, accepting two types of parameters io.Reader and *Payload, io.Reader was mentioned in the implementation of the previous tcp server , the client’s connection will also implement this interface
- readLine: read one line at a time, ending with \\
- adjustType: According to the type of protocol, the corresponding processing function will be returned
- readBody: used to read the real data following $
The above is the general description. The actual analysis is performed normally according to the RESP protocol. You can analyze it by yourself. The following is the code
/** protocol parser */ // Payload data entity type Payload struct {<!-- --> //The data that the client replies to the server, why use the Reply format, because for the server, it is also the data that the client replies to the server Data resp. Reply //Is there an error Err error } // readState parser state type readState struct {<!-- --> //Analyze single-line or multi-line data readingMultiline bool //Record the number of parameters of the parsing command, if it is an array, then it will be multiple, if it is a single line, it will be 1 expectedArgsCount int //Type of packet. For example: *, +, $ and other types of data packets, refer to the description of the resp protocol msgType byte //The passed parameter itself, two-dimensional data for data storage, for example: set k v, there are three groups args[][]byte //Analysis of the byte length of the subsequent data that needs to be parsed bulkLen int64 } // finished Whether the current parser has finished parsing func (r *readState) finished() bool {<!-- --> //The parsed number is the same as the number of parameters return r.expectedArgsCount > 0 & amp; & amp; len(r.args) == r.expectedArgsCount } // result saves the result according to the type of the instruction and returns it func (r *readState) result() resp.Reply {<!-- --> switch r.msgType {<!-- --> case common. ASTERISK: return reply. MakeMultiBulkReply(r. args) case common. DOLLAR: return reply. MakeBulkReply(r. args[0]) default: return nil } } // parser0 parses the data and the data sent by tcp is io.Reader func parser0(reader io. Reader, ch chan *Payload) {<!-- --> //Even if there is an error, it cannot jump out of the infinite loop defer func() {<!-- --> if err := recover(); err != nil {<!-- --> logger. Error(string(debug. Stack())) } }() bufReader := bufio. NewReader(reader) state := &readState{<!-- -->} var err error var msg []byte //Infinite loop read data continuously for true {<!-- --> var ioErr bool /** msg: Returns a line of data that read \\ is divided, ioErr: indicates whether an exception is read, err: the returned exception For example: *3\r\\ $3\r\\ SET\r\\ $3\r\\ key\r\\ $5\r\\ value\r\\ data, then the read is *3\r\\ */ msg, ioErr, err = readLine(bufReader, state) if err != nil {<!-- --> //Read a line of data first to determine whether it is an io error if ioErr {<!-- --> //Write error data to the pipeline ch <- & amp;Payload{<!-- --> Err: err, } close(ch) return } // common error ch <- & amp;Payload{<!-- --> Err: err, } state = &readState{<!-- -->} continue } /** Judging whether it is multi-line parsing, the default is false for the first time it comes in, and its state will be changed after it is read For example: *3\r\\ $3\r\\ SET\r\\ $3\r\\ key\r\\ $5\r\\ value\r\\ Parsing a line for the first time: *3\r\\ Change the status readingMultiline to multi-line status, parsing 3 means the array has 3 parameters The second cycle parses a line: $3\r\\ means that there is a string behind it, and the state of readingMultiline is changed for the first time, so go to the else to read through readBody(), and modify state.bulkLen to explain the follow-up The parameter byte length The third cycle parses a line: SET\r\\ parses the parameters, parses through readBody(), and stores the data directly into the state.args[][] array The fourth cycle parses a line: $3\r\\ Continue to parse the length of subsequent parameters The fifth cycle parses a line: KEY\r\\ The parameters read are stored in the array The sixth cycle parses a line: $5\r\\ Continue to parse the length of subsequent parameters is 5 The seventh cycle parses a line: VALUE\r\\ The read parameters are stored in the array */ if !state.readingMultiline {<!-- --> //Judge what type the first symbol of the data is first := msg[0] //Use the adjustType() function to determine what type of data is, and then call the returned function payload := adjustType(first)(msg, state, ch) //Return the entity of the response to the pipeline if payload != nil {<!-- --> ch <- payload } } else {<!-- --> // read data err := readBody(msg, state) if err != nil {<!-- --> //If err is not empty, return a protocol error ch <- & amp;Payload{<!-- --> Err: errors. New("protocol error:" + string(msg)), } //reset state controller state = &readState{<!-- -->} continue } / / Determine whether the read is complete if state. finished() {<!-- --> ch <- & amp;Payload{<!-- --> //Wrap the Resp object Data: state.result(), } state = &readState{<!-- -->} } } } } //adjustType determines the type and returns the corresponding type processing function func adjustType(first byte) func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- --> //Return a default processing function, print the error message resultFunc := func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- --> logger.Error("The current type: %s packet processing is not supported", first) return nil } //First judge the * number if first == common. ASTERISK {<!-- --> //protocol parsing error resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- --> // * number parses multiple rows of data err := parseMultiBulkHeader(msg, state) // Handle read errors if err != nil {<!-- --> state = &readState{<!-- -->} return & amp;Payload{<!-- --> Err: errors. New("protocol error:" + string(msg)), } } //If the parsed array length is empty if state.expectedArgsCount <= 0 {<!-- --> state = &readState{<!-- -->} return & amp;Payload{<!-- --> //Respond to the core of redis with an empty array instead of returning to the client Data: reply. MakeEmptyMultiBulkReply(), } } return nil } } else if first == common.DOLLAR {<!-- --> // parse $ resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- --> err := parseBulkHeader(msg, state) // Handle read errors if err != nil {<!-- --> state = &readState{<!-- -->} return & amp;Payload{<!-- --> Err: errors. New("protocol error:" + string(msg)), } } //Response of empty string -1 means empty if state.bulkLen == -1 {<!-- --> state = &readState{<!-- -->} return & amp;Payload{<!-- --> //Respond to the core of redis with an empty array instead of returning to the client Data: reply. MakeNullBulkReply(), } } return nil } } else {<!-- --> //Analysis + or - sign resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {<!-- --> //Parse a single line result, err := parseSingleLine(msg) //Clear the state register state = &readState{<!-- -->} return & amp;Payload{<!-- --> Data: result, Err: err, } } } return resultFunc } // ParseStream will return a pipeline when parsing the byte stream, and send data to the pipeline after the parsing is complete func ParseStream(reader io. Reader) chan *Payload {<!-- --> //create a pipeline ch := make(chan *Payload) go parser0(reader, ch) //Respond to a ch pipeline for the redis core, the core layer only needs to monitor the pipeline data return ch } // readLine reads a line of data according to the instructions. For example: *3\r\\ $3\r\\ SET\r\\ $3\r\\ key\r\\ $5\r\\ value\r\\ func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {<!-- --> /** 1. Segment according to the normal situation of \r\\ (the $ character is not read), because $ is a preset command, indicating how many characters need to be read later 2. If the $ character is read before, it will be read strictly according to the number of characters following the $, and no branching can be performed */ var msg []byte var err error if state.bulkLen == 0 {<!-- --> // 1. Indicates that no pre-set instructions such as $ were read before, segmented according to \r\\ msg, err = bufReader. ReadBytes('\\ ') if err != nil {<!-- --> return nil, true, err } // After judging whether \\ is cut, whether the penultimate is \r to distinguish, the protocol format is wrong if len(msg) == 0 || msg[len(msg)-2] != '\r' {<!-- --> return nil, false, errors. New("protocol error: " + string(msg)) } } else {<!-- --> //2. Preset characters such as $ are read in front, state.bulkLen + 2 needs to read the following \r\\ msg = make([]byte, state. bulkLen + 2) //Read all the lengths of the array in bufReader _, err := io. ReadFull(bufReader, msg) if err != nil {<!-- --> return nil, true, err } //Judge whether it is \r\\ to distinguish, that is, the protocol format is wrong if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\\ ' {< !-- --> return nil, false, errors. New("protocol error: " + string(msg)) } //Set the preset length to 0 state. bulkLen = 0 } return msg, false, nil } // parseMultiBulkHeader parses multiple lines, for example: *3\r\\ $3\r\\ SET\r\\ $3\r\\ key\r\\ $5\r\\ value \r\\ func parseMultiBulkHeader(msg []byte, state *readState) error {<!-- --> var err error var expectedLine uint64 //Cut out the 3 in *3\r\\ to get the length of the current packet expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil {<!-- --> return errors. New("protocol error: " + string(msg)) } if expectedLine == 0 {<!-- --> // read empty data state.expectedArgsCount = 0 return nil } else if expectedLine > 0 {<!-- --> //Indicates the length of subsequent data //Analyzed flag bit, type of message state.msgType = msg[0] // now reading multiline status state.readingMultiline = true //Set the number of subsequent parameters state. expectedArgsCount = int(expectedLine) / / Initialize the length of the array, two-dimensional array for storage state.args = make([][]byte, 0, expectedLine) return nil } else {<!-- --> return errors. New("protocol error: " + string(msg)) } } // parseBulkHeader parses a single line, for example: $4\r\\ func parseBulkHeader(msg []byte, state *readState) error {<!-- --> var err error //Analyze the data length of a single line, for example: $4\r\\ , here it is 4 state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 32) if err != nil {<!-- --> return errors. New("protocol error: " + string(msg)) } if state.bulkLen == -1 {<!-- --> return nil } else if state.bulkLen > 0 {<!-- --> //Analyzed flag bit, type of message state.msgType = msg[0] // now reading multiline status state.readingMultiline = false //The number of lines to read parameters is 1 line state.expectedArgsCount = 1 //Initialize the length of the array, if there is a single line, then the subsequent parameter is 1 state.args = make([][]byte, 0, 1) return nil } else {<!-- --> return errors. New("protocol error: " + string(msg)) } } // Parse the + OK, -err, :5\r\\ format of the client response, which can be parsed directly func parseSingleLine(msg []byte) (resp.Reply, error) {<!-- --> // parse the text str := strings.TrimSuffix(string(msg), "\r\\ ") var result resp.Reply switch msg[0] {<!-- --> case common.PLUS: // parse the correct response package result = reply. MakeStatusReply(str[1:]) case common.DASH: //parse error response packet result = reply. MakeStandardErrReply(str[1:]) case common.COLON: //parsing integer packets val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil {<!-- --> return nil, errors. New("protocol error: " + string(msg)) } result = reply. MakeIntReply(val) } return result, nil } /** There are two possible situations: 1. $3\r\\ 2. SET\r\\ */ func readBody(msg []byte, state *readState) error {<!-- --> //This is to intercept the subsequent \r\\ delimiter line := msg[0 : len(msg)-2] var err error // $3 parses the 3 behind $ and comes out if line[0] == common. DOLLAR {<!-- --> //Save the byte length that needs to be parsed later into the state device state. bulkLen, err = strconv. ParseInt(string(line[1:]), 10, 64) if err != nil {<!-- --> return errors. New("protocol error: " + string(msg)) } //if case type is $0\r\\ if state.bulkLen <= 0 {<!-- --> //empty length state.args = append(state.args, []byte{<!-- -->}) state. bulkLen = 0 } } else {<!-- --> //Go here and the data is SET state.args = append(state.args, line) } return nil }