GO implements Redis: GO implements memory database (3)

  • Realize the database layer of Redis (core layer: process commands and return)
  • GitHub – csgopher/go-redis: learning Godis notes
  • This article refers to the following documents:
    dict: some methods to define dictionaries
    sync_dict: implement dict
    db: sub-database
    command: define command
    ping, keys, string: the specific processing logic of the command
    database: stand-alone database

datastruct/dict/dict.go

type Consumer func(key string, val interface{}) bool

type Dict interface {
   Get(key string) (val interface{}, exists bool)
   Len() int
   Put(key string, val interface{}) (result int)
   PutIfAbsent(key string, val interface{}) (result int)
   PutIfExists(key string, val interface{}) (result int)
   Remove(key string) (result int)
   ForEach(consumer Consumer)
   Keys() []string
   RandomKeys(limit int) []string
   RandomDistinctKeys(limit int) []string
   clear()
}

Dict interface: The interface of the Redis data structure. Here we use sync.Map as the implementation of the dictionary. If you want to use other data structures, just change the implementation
Consumer: traverse all the key-value pairs of the dictionary, the return value is Boolean, true to continue traversal, false to stop traversal

datastruct/dict/sync_dict.go

type SyncDict struct {
   m sync.Map
}

func MakeSyncDict() *SyncDict {
   return & SyncDict{}
}

func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {
   val, ok := dict.m.Load(key)
   return val, ok
}

func (dict *SyncDict) Len() int {
   length := 0
   dict.m.Range(func(k, v interface{}) bool {
      length + +
      return true
   })
   return length
}

func (dict *SyncDict) Put(key string, val interface{}) (result int) {
   _, existed := dict.m.Load(key)
   dict.m.Store(key, val)
   if exists {
      return 0
   }
   return 1
}

func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
   _, existed := dict.m.Load(key)
   if exists {
      return 0
   }
   dict.m.Store(key, val)
   return 1
}

func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
   _, existed := dict.m.Load(key)
   if exists {
      dict.m.Store(key, val)
      return 1
   }
   return 0
}

func (dict *SyncDict) Remove(key string) (result int) {
   _, existed := dict.m.Load(key)
   dict.m.Delete(key)
   if exists {
      return 1
   }
   return 0
}

func (dict *SyncDict) ForEach(consumer Consumer) {
   dict.m.Range(func(key, value interface{}) bool {
      consumer(key.(string), value)
      return true
   })
}

func (dict *SyncDict) Keys() []string {
   result := make([]string, dict. Len())
   i := 0
   dict.m.Range(func(key, value interface{}) bool {
      result[i] = key.(string)
      i + +
      return true
   })
   return result
}

func (dict *SyncDict) RandomKeys(limit int) []string {
   result := make([]string, limit)
   for i := 0; i < limit; i ++ {
      dict.m.Range(func(key, value interface{}) bool {
         result[i] = key.(string)
         return false
      })
   }
   return result
}

func (dict *SyncDict) RandomDistinctKeys(limit int) []string {
   result := make([]string, limit)
   i := 0
   dict.m.Range(func(key, value interface{}) bool {
      result[i] = key.(string)
      i + +
      if i == limit {
         return false
      }
      return true
   })
   return result
}

func (dict *SyncDict) Clear() {
   *dict = *MakeSyncDict()
}

Use sync.Map to implement the Dict interface

database/db.go

type DB struct {
index int
data dict. Dict
}

type ExecFunc func(db *DB, args [][]byte) resp.Reply

type CmdLine = [][]byte

func makeDB() *DB {
db := &DB{
data: dict. MakeSyncDict(),
}
return db
}

func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {
cmdName := strings.ToLower(string(cmdLine[0]))
cmd, ok := cmdTable[cmdName]
if !ok {
return reply. MakeErrReply("ERR unknown command '" + cmdName + "'")
}
if !validateArity(cmd.arity, cmdLine) {
return reply. MakeArgNumErrReply(cmdName)
}
fun := cmd.executor
return fun(db, cmdLine[1:]) // cut off the set in set k v
}

func validateArity(arity int, cmdArgs [][]byte) bool {
argNum := len(cmdArgs)
if arity >= 0 {
return argNum == arity
}
return argNum >= -arity
}

func (db *DB) GetEntity(key string) (*database. DataEntity, bool) {
raw, ok := db. data. Get(key)
if !ok {
return nil, false
}
entity, _ := raw.(*database.DataEntity)
return entity, true
}

func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
return db.data.Put(key, entity)
}

func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
return db.data.PutIfExists(key, entity)
}

func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
return db.data.PutIfAbsent(key, entity)
}

func (db *DB) Remove(key string) {
db.data.Remove(key)
}

func (db *DB) Removes(keys ... string) (deleted int) {
deleted = 0
for _, key := range keys {
_, exists := db.data.Get(key)
if exists {
db. Remove(key)
deleted + +
}
}
return deleted
}

func (db *DB) Flush() {
db.data.Clear()
}

Realize sub-database in Redis
ExecFunc: All Redis instructions are written in this type
validateArity method:

  • Fixed length: set k v => arity=3;
  • Variable length: exists k1 k2 k3 … => arity=-2, means >=2 parameters

database/command.go

var cmdTable = make(map[string]*command)

type command struct {
   executor ExecFunc
   arity int
}

func RegisterCommand(name string, executor ExecFunc, arity int) {
   name = strings. ToLower(name)
   cmdTable[name] = &command{
      executor: executor,
      arity: arity,
   }
}

command: Each command structure is a command, such as ping, keys, etc.
arity: the number of parameters
cmdTable: Record the relationship between all instructions and the command structure
RegisterCommand: the implementation of the registration command, in the program

database/ping.go

func Ping(db *DB, args [][]byte) resp.Reply {
    if len(args) == 0 {
        return & reply.PongReply{}
    } else if len(args) == 1 {
        return reply. MakeStatusReply(string(args[0]))
    } else {
        return reply. MakeErrReply("ERR wrong number of arguments for 'ping' command")
    }
}

func init() {
    RegisterCommand("ping", Ping, 1)
}

init method: This method is called when the program is started for initialization

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {
   keys := make([]string, len(args))
   for i, v := range args {
      keys[i] = string(v)
   }

   deleted := db. Removes(keys...)
   return reply. MakeIntReply(int64(deleted))
}

func execExists(db *DB, args [][]byte) resp.Reply {
   result := int64(0)
   for _, arg := range args {
      key := string(arg)
      _, exists := db. GetEntity(key)
      if exists {
         result ++
      }
   }
   return reply. MakeIntReply(result)
}

func execFlushDB(db *DB, args [][]byte) resp.Reply {
   db. Flush()
   return & reply. OkReply{}
}

func execType(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   entity, exists := db. GetEntity(key)
   if !exists {
      return reply. MakeStatusReply("none")
   }
   switch entity. Data. (type) {
   case []byte:
      return reply. MakeStatusReply("string")
   }
   return & reply. UnknownErrReply{}
}

func execRename(db *DB, args [][]byte) resp.Reply {
   if len(args) != 2 {
      return reply. MakeErrReply("ERR wrong number of arguments for 'rename' command")
   }
   src := string(args[0])
   dest := string(args[1])
   
   entity, ok := db. GetEntity(src)
   if !ok {
      return reply. MakeErrReply("no such key")
   }
   db. PutEntity(dest, entity)
   db. Remove(src)
   return & reply. OkReply{}
}

func execRenameNx(db *DB, args [][]byte) resp.Reply {
   src := string(args[0])
   dest := string(args[1])

   _, exist := db. GetEntity(dest)
   if exist {
      return reply. MakeIntReply(0)
   }

   entity, ok := db. GetEntity(src)
   if !ok {
      return reply. MakeErrReply("no such key")
   }
   db. Removes(src, dest)
   db. PutEntity(dest, entity)
   return reply. MakeIntReply(1)
}

func execKeys(db *DB, args [][]byte) resp.Reply {
   pattern := wildcard. CompilePattern(string(args[0]))
   result := make([][]byte, 0)
   db.data.ForEach(func(key string, val interface{}) bool {
      if pattern. IsMatch(key) {
         result = append(result, []byte(key))
      }
      return true
   })
   return reply. MakeMultiBulkReply(result)
}

func init() {
   RegisterCommand("Del", execDel, -2)
   RegisterCommand("Exists", execExists, -2)
   RegisterCommand("Keys", execKeys, 2)
   RegisterCommand("FlushDB", execFlushDB, -1)
   RegisterCommand("Type", execType, 2)
   RegisterCommand("Rename", execRename, 3)
   RegisterCommand("RenameNx", execRenameNx, 3)
}

keys.go implements the following directives:
execDel: del k1 k2 k3 …
execExists: exist k1 k2 k3 …
execFlushDB:flushdb
execType: type k1
execRename:rename k1 k2
execRenameNx:renamenx k1 k2
execKeys: keys (the tool class wildcard.go that depends on the lib package)

database/string.go

func execGet(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   bytes, err := db. getAsString(key)
   if err != nil {
      return err
   }
   if bytes == nil {
      return & reply.NullBulkReply{}
   }
   return reply. MakeBulkReply(bytes)
}

func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
   entity, ok := db. GetEntity(key)
   if !ok {
      return nil, nil
   }
   bytes, ok := entity.Data.([]byte)
   if !ok {
      return nil, & amp; reply. WrongTypeErrReply{}
   }
   return bytes, nil
}

func execSet(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   value := args[1]
   entity := &database.DataEntity{
      Data: value,
   }
   db. PutEntity(key, entity)
   return & reply. OkReply{}
}

func execSetNX(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   value := args[1]
   entity := &database.DataEntity{
      Data: value,
   }
   result := db.PutIfAbsent(key, entity)
   return reply. MakeIntReply(int64(result))
}

func execGetSet(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   value := args[1]

   entity, exists := db. GetEntity(key)
   db.PutEntity(key, & amp;database.DataEntity{Data: value})
   if !exists {
      return reply. MakeNullBulkReply()
   }
   old := entity.Data.([]byte)
   return reply. MakeBulkReply(old)
}

func execStrLen(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   entity, exists := db. GetEntity(key)
   if !exists {
      return reply. MakeNullBulkReply()
   }
   old := entity.Data.([]byte)
   return reply. MakeIntReply(int64(len(old)))
}

func init() {
   RegisterCommand("Get", execGet, 2)
   RegisterCommand("Set", execSet, -3)
   RegisterCommand("SetNx", execSetNX, 3)
   RegisterCommand("GetSet", execGetSet, 3)
   RegisterCommand("StrLen", execStrLen, 2)
}

string.go implements the following directives:
execGet:get k1
execSet: set k v
execSetNX:setnex k v
execGetSet: getset k v returns old value
execStrLen: strlen k

database/database.go

type Database struct {
   dbSet []*DB
}

func NewDatabase() *Database {
   mdb := &Database{}
   if config.Properties.Databases == 0 {
      config.Properties.Databases = 16
   }
   mdb.dbSet = make([]*DB, config.Properties.Databases)
   for i := range mdb.dbSet {
      singleDB := makeDB()
      singleDB. index = i
      mdb.dbSet[i] = singleDB
   }
   return mdb
}

func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
   defer func() {
      if err := recover(); err != nil {
         logger.Warn(fmt.Sprintf("error occurs: %v\
%s", err, string(debug.Stack())))
      }
   }()

   cmdName := strings.ToLower(string(cmdLine[0]))
   if cmdName == "select" {
      if len(cmdLine) != 2 {
         return reply. MakeArgNumErrReply("select")
      }
      return execSelect(c, mdb, cmdLine[1:])
   }
   dbIndex := c. GetDBIndex()
   selectedDB := mdb.dbSet[dbIndex]
   return selectedDB. Exec(c, cmdLine)
}

func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
   dbIndex, err := strconv.Atoi(string(args[0]))
   if err != nil {
      return reply. MakeErrReply("ERR invalid DB index")
   }
   if dbIndex >= len(mdb.dbSet) {
      return reply. MakeErrReply("ERR DB index is out of range")
   }
   c. SelectDB(dbIndex)
   return reply. MakeOkReply()
}

func (mdb *Database) Close() {
}

func (mdb *Database) AfterClientClose(c resp. Connection) {
}

Database: a collection of db
Exec: Execute switching db commands or other commands
execSelect method: select db (command: select 2)

resp/handler/handler.go

import (
database2 "go-redis/database"
)

func MakeHandler() *RespHandler {
   var db database.Database
   db = database2. NewDatabase()
   return & RespHandler{
      db: db,
   }
}

Modify the database implementation that implements the protocol layer handler

Architecture Summary

The TCP layer serves the TCP connection, and then passes the connection to the handler of the RESP protocol layer. The handler monitors the connection of the client, parses the command and sends it to the pipeline, and the pipeline is transferred to the database layer (database/database.go), and the core layer according to the command type Execute a different method, then return.