Processor
After the previous protocol analysis is completed, the resp.Reply structure of the protocol analysis needs to be handed over to the processor for processing, and the processing will package the connection of net.Conn into A Connection interface body is handed over for subsequent use, and then calls the protocol parser to get the returned chan pipeline and then monitors the data in the pipeline for Operation, return the corresponding response to the client
Connection
The connection object is used to wrap the structure of each client. After each client’s request arrives at the handler, it will be packaged as a Connection object and then saved; in interface/resp/conn.go
// Connection is used to encapsulate the connection client interface type Connection interface {<!-- --> //Write write data Write([]byte) error //GetDBIndex gets the corresponding DB database GetDBIndex() int //SelectDB switch DB SelectDB(dbNum int) int }
conn.go
Create a Connection structure to wrap the client connection object
Field Description
- conn: connection in net package
- waitingReply: When sending commands back to the client, if the connection is closed, all commands need to be processed
- mu: When operating a client connection, it needs to be locked to prevent concurrency problems
- selectedDB: Specifies the database that the current connection is operating on
Method Description
- NewConn: public method provided outside the heap
- RemoteAddr: Get the ip address of the remote client
- Close: Close the client connection
- Write: return data to the client
- GetDBIndex: Get the index of the current connection operation database
- SelectDB: switch database
// Connection protocol layer describes each connection type Connection struct {<!-- --> //Client connection information conn net.Conn //When sending commands back to the client, if the connection is closed, all commands need to be processed waiting Reply wait. Wait //When operating a client connection, it needs to be locked to prevent concurrency problems mu sync.Mutex //Specify the database that the current connection is operating on selectedDB int } //NewConn creates a new connection func NewConn(conn net.Conn) *Connection {<!-- --> return & amp;Connection{<!-- --> //The connection needs to be new, and other parameters are initialized with the default conn: conn, } } // RemoteAddr gets the address of the remote client connection func (c *Connection) RemoteAddr() net. Addr {<!-- --> return c.conn.RemoteAddr() } // Close is used to close the connection, func (c *Connection) Close() error {<!-- --> //10 seconds timeout, waiting for the command processing to complete c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } //Write sends data to the client func (c *Connection) Write(bytes []byte) error {<!-- --> defer func() {<!-- --> c.waitingReply.Done() c.mu.Unlock() }() //Empty array returns directly if len(bytes) <= 0 {<!-- --> return nil } //Only one coroutine can send data to the client at the same time c.mu.Lock() //Indicates that there is a coroutine sending data c.waitingReply.Add(1) _, err := c.conn.Write(bytes) return err } // GetDBIndex gets the index of db func (c *Connection) GetDBIndex() int {<!-- --> return c.selectedDB } // SelectDB select db func (c *Connection) SelectDB(dbNum int) int {<!-- --> c.selectedDB = dbNum return c.selectedDB }
Handler
As mentioned earlier, a top-level interface of a handler is created in interface/tcp/handler.go, which defines two methods
- Handler: ctx passes the context object, the connection object in the conn net package
- Close: close method
// Handler tcp abstracts the handler function interface type Handler interface {<!-- --> //Handler ctx context, conn tcp connection Handler(ctx context. Context, conn net. Conn) //Close off Close() error }
RespHandler
The processing entity class of the resp protocol is defined in resp/handler/handler.go
handler.go
Field Description
- activeConn: save all client connections thread-safe Map in sync.Map package
- db: custom database layer implementation, which will be implemented later
- closing: whether to close
Method Description
- MakeHandler: Provides a method for creating RespHandler
- closeClient: close the client
- Handler: processing method, after the previous tcp service receives the connection, it will hand over the corresponding request to Handler for processing, and the handler will then call parser.ParseStream() to implement the protocol previously Parse Parsing, get the returned chan pipeline and loop through the obtained Payload class
- Close: close the handler
// RespHandler handles the processor of the Resp protocol type RespHandler struct {<!-- --> // save all connections activeConn sync.Map //Database core business layer db databaseface.Database //is it closing closing atomic. Boolean } //MakeHandler creates a handler func MakeHandler() *RespHandler {<!-- --> var db databaseface.Database db = database. NewDatabase() return & RespHandler{<!-- --> db: db, } } //closeClient closes a single client func (r *RespHandler) closeClient(client *connection.Connection) {<!-- --> //client connection is closed _ = client.Close() // Do some aftermath work when the client connection is closed r.db.AfterClientClose(client) //delete client r.activeConn.Delete(client) } //Handler for processing func (r *RespHandler) Handler(ctx context.Context, conn net.Conn) {<!-- --> if r.closing.Get() {<!-- --> //Determine whether it is closing _ = conn.Close() } client := connection. NewConn(conn) // Temporarily use an empty structure, the value will not take up space, and the map becomes a Set structure r.activeConn.Store(client, struct{<!-- -->}{<!-- -->}) //Send the connection to parser for tcp message analysis ch := parser.ParseStream(conn) //Equivalent to an infinite loop, until the parsed data is obtained for payload := range ch {<!-- --> err := payload.Err // wrong logic if err != nil {<!-- --> //It means that the client sent us four waves to disconnect or use a closed connection if err == io.EOF || err == io.ErrUnexpectedEOF || strings.Contains(err.Error(), "use of closed network connection") {<!-- --> r. closeClient(client) logger.Info("Connection closed:" + client.RemoteAddr().String()) } //protocol error errReply := reply. MakeStandardErrReply(err. Error()) //Write back the wrong data information to the client err := client.Write(errReply.ToBytes()) //If there is an error in writing back, close the client directly if err != nil {<!-- --> r. closeClient(client) logger.Info("Connection closed:" + client.RemoteAddr().String()) } //Continue to execute continue } // Execute normal request logic data := payload.Data if data == nil {<!-- --> continue } multiBulkReply, ok := data.(*reply.MultiBulkReply) if !ok {<!-- --> logger.Error("require multi bulk reply.......") continue } / / Execute instructions through the kernel result := r.db.Exec(client, multiBulkReply.Args) if result != nil {<!-- --> _ = client. Write(result. ToBytes()) } else {<!-- --> _ = client.Write(reply.MakeUnknownErrReply().ToBytes()) } } } //Close close the client func (r *RespHandler) Close() error {<!-- --> logger.Info("handler shutting down...") r.closing.Set(true) r.activeConn.Range(func(key, value any) bool {<!-- --> //Close all clients client := key.(*connection.Connection) _ = client.Close() r.db.AfterClientClose(client) //Return true before the next traversal return true }) r.db.Close() return nil }