Go handwritten Redis (7) Handler implementation

Processor

After the previous protocol analysis is completed, the resp.Reply structure of the protocol analysis needs to be handed over to the processor for processing, and the processing will package the connection of net.Conn into A Connection interface body is handed over for subsequent use, and then calls the protocol parser to get the returned chan pipeline and then monitors the data in the pipeline for Operation, return the corresponding response to the client

Connection

The connection object is used to wrap the structure of each client. After each client’s request arrives at the handler, it will be packaged as a Connection object and then saved; in interface/resp/conn.go

// Connection is used to encapsulate the connection client interface
type Connection interface {<!-- -->
//Write write data
Write([]byte) error

//GetDBIndex gets the corresponding DB database
GetDBIndex() int

//SelectDB switch DB
SelectDB(dbNum int) int
}

conn.go

Create a Connection structure to wrap the client connection object

Field Description

  • conn: connection in net package
  • waitingReply: When sending commands back to the client, if the connection is closed, all commands need to be processed
  • mu: When operating a client connection, it needs to be locked to prevent concurrency problems
  • selectedDB: Specifies the database that the current connection is operating on

Method Description

  • NewConn: public method provided outside the heap
  • RemoteAddr: Get the ip address of the remote client
  • Close: Close the client connection
  • Write: return data to the client
  • GetDBIndex: Get the index of the current connection operation database
  • SelectDB: switch database
// Connection protocol layer describes each connection
type Connection struct {<!-- -->
//Client connection information
conn net.Conn
//When sending commands back to the client, if the connection is closed, all commands need to be processed
waiting Reply wait. Wait
//When operating a client connection, it needs to be locked to prevent concurrency problems
mu sync.Mutex
//Specify the database that the current connection is operating on
selectedDB int
}

//NewConn creates a new connection
func NewConn(conn net.Conn) *Connection {<!-- -->
return & amp;Connection{<!-- -->
//The connection needs to be new, and other parameters are initialized with the default
conn: conn,
}
}

// RemoteAddr gets the address of the remote client connection
func (c *Connection) RemoteAddr() net. Addr {<!-- -->
return c.conn.RemoteAddr()
}

// Close is used to close the connection,
func (c *Connection) Close() error {<!-- -->
//10 seconds timeout, waiting for the command processing to complete
c.waitingReply.WaitWithTimeout(10 * time.Second)
_ = c.conn.Close()
return nil
}

//Write sends data to the client
func (c *Connection) Write(bytes []byte) error {<!-- -->
defer func() {<!-- -->
c.waitingReply.Done()
c.mu.Unlock()
}()
//Empty array returns directly
if len(bytes) <= 0 {<!-- -->
return nil
}
//Only one coroutine can send data to the client at the same time
c.mu.Lock()
//Indicates that there is a coroutine sending data
c.waitingReply.Add(1)
_, err := c.conn.Write(bytes)
return err
}

// GetDBIndex gets the index of db
func (c *Connection) GetDBIndex() int {<!-- -->
return c.selectedDB
}

// SelectDB select db
func (c *Connection) SelectDB(dbNum int) int {<!-- -->
c.selectedDB = dbNum
return c.selectedDB
}

Handler

As mentioned earlier, a top-level interface of a handler is created in interface/tcp/handler.go, which defines two methods

  • Handler: ctx passes the context object, the connection object in the conn net package
  • Close: close method
// Handler tcp abstracts the handler function interface
type Handler interface {<!-- -->
//Handler ctx context, conn tcp connection
Handler(ctx context. Context, conn net. Conn)

//Close off
Close() error
}

RespHandler

The processing entity class of the resp protocol is defined in resp/handler/handler.go

handler.go

Field Description

  • activeConn: save all client connections thread-safe Map in sync.Map package
  • db: custom database layer implementation, which will be implemented later
  • closing: whether to close

Method Description

  • MakeHandler: Provides a method for creating RespHandler
  • closeClient: close the client
  • Handler: processing method, after the previous tcp service receives the connection, it will hand over the corresponding request to Handler for processing, and the handler will then call parser.ParseStream() to implement the protocol previously Parse Parsing, get the returned chan pipeline and loop through the obtained Payload class
  • Close: close the handler
// RespHandler handles the processor of the Resp protocol
type RespHandler struct {<!-- -->
// save all connections
activeConn sync.Map
//Database core business layer
db databaseface.Database
//is it closing
closing atomic. Boolean
}

//MakeHandler creates a handler
func MakeHandler() *RespHandler {<!-- -->
var db databaseface.Database
db = database. NewDatabase()
return & RespHandler{<!-- -->
db: db,
}
}

//closeClient closes a single client
func (r *RespHandler) closeClient(client *connection.Connection) {<!-- -->
//client connection is closed
_ = client.Close()
// Do some aftermath work when the client connection is closed
r.db.AfterClientClose(client)
//delete client
r.activeConn.Delete(client)
}

//Handler for processing
func (r *RespHandler) Handler(ctx context.Context, conn net.Conn) {<!-- -->
if r.closing.Get() {<!-- -->
//Determine whether it is closing
_ = conn.Close()
}
client := connection. NewConn(conn)
// Temporarily use an empty structure, the value will not take up space, and the map becomes a Set structure
r.activeConn.Store(client, struct{<!-- -->}{<!-- -->})
//Send the connection to parser for tcp message analysis
ch := parser.ParseStream(conn)
//Equivalent to an infinite loop, until the parsed data is obtained
for payload := range ch {<!-- -->
err := payload.Err
// wrong logic
if err != nil {<!-- -->
//It means that the client sent us four waves to disconnect or use a closed connection
if err == io.EOF ||
err == io.ErrUnexpectedEOF ||
strings.Contains(err.Error(), "use of closed network connection") {<!-- -->
r. closeClient(client)
logger.Info("Connection closed:" + client.RemoteAddr().String())
}
//protocol error
errReply := reply. MakeStandardErrReply(err. Error())
//Write back the wrong data information to the client
err := client.Write(errReply.ToBytes())
//If there is an error in writing back, close the client directly
if err != nil {<!-- -->
r. closeClient(client)
logger.Info("Connection closed:" + client.RemoteAddr().String())
}
//Continue to execute
continue
}
// Execute normal request logic
data := payload.Data
if data == nil {<!-- -->
continue
}
multiBulkReply, ok := data.(*reply.MultiBulkReply)
if !ok {<!-- -->
logger.Error("require multi bulk reply.......")
continue
}
/ / Execute instructions through the kernel
result := r.db.Exec(client, multiBulkReply.Args)
if result != nil {<!-- -->
_ = client. Write(result. ToBytes())
} else {<!-- -->
_ = client.Write(reply.MakeUnknownErrReply().ToBytes())
}
}
}

//Close close the client
func (r *RespHandler) Close() error {<!-- -->
logger.Info("handler shutting down...")
r.closing.Set(true)
r.activeConn.Range(func(key, value any) bool {<!-- -->
//Close all clients
client := key.(*connection.Connection)
_ = client.Close()
r.db.AfterClientClose(client)
//Return true before the next traversal
return true
})
r.db.Close()
return nil
}