Go handwritten Redis (8) database core layer and instruction implementation

Database core layer

The logic of the processor has been implemented before, and now the core data layer has been implemented. The core database is mainly used to execute instructions sent by users and store data

1. Database

The top-level interface definition of the data layer is defined in the interface/database/database.go file,
It defines the Database interface and the DataEntity structure for packaging data, where the interface type means that any type of data can be stored

//Database The business layer of the database
type Database interface {<!-- -->
//Exec The core layer executes the command and returns a response body
Exec(client resp.Connection, args [][]byte) resp.Reply
//Close off
close()
//AfterClientClose Some operations that need to be done after the client is closed
AfterClientClose(c resp. Connection)
}

//DataEntity data structure, used to wrap any data type
type DataEntity struct {<!-- -->
//Data can refer to any data
Data interface{<!-- -->}
}

2. Dict

The data structure of storing dictionaries in redis, we will implement this interface to store data, define the interface in datastruct/dict/dict.go

/**
Dict data structure for storing dictionaries in redis
*/
type Dict interface {<!-- -->
//Get returns the value value, exist represents whether it exists
Get(key string) (val interface{<!-- -->}, exist bool)
//Len The data in the dictionary
Len() int
//Put store data, return the number of stored
Put(key string, value interface{<!-- -->}) (result int)
//PutIfAbsent is set if it does not exist
PutIfAbsent(key string, value interface{<!-- -->}) (result int)
//PutIfExists is set if it exists, and it is not set if it does not exist
PutIfExists(key string, value interface{<!-- -->}) (result int)
//Remove remove
Remove(key string) (result int)
//ForEach traversal method
ForEach(consumer Consumer)
//Keys returns all keys
Keys() []string
//RandomKeys how many keys need to be returned randomly
RandomKeys(limit int) []string
//RandomDistinctKeys returns the specified number of unique keys
RandomDistinctKeys(limit int) []string
//Clear clears the dictionary table
clear()
}

//Consumer custom traversal function
type Consumer func(key string, val interface{<!-- -->}) bool

3. Implementation class

3.1 SyncDict

Implement the Dict interface. Where data is normally stored, we use sync.Map to store data thread-safely

//SyncDict uses a concurrent safe dictionary, why do you need to wrap a layer, because the underlying data interface may be replaced
type SyncDict struct {<!-- -->
//m concurrently safe map
m sync.Map
}

//Get get method
func (dict *SyncDict) Get(key string) (val interface{<!-- -->}, exist bool) {<!-- -->
return dict.m.Load(key)
}

//Len uses the range method of map by default to traverse the number
func (dict *SyncDict) Len() int {<!-- -->
length := 0
dict.m.Range(func(key, value any) bool {<!-- -->
length + +
return true
})
return length
}

//Put to store data
func (dict *SyncDict) Put(key string, value interface{<!-- -->}) (result int) {<!-- -->
_, existed := dict.m.Load(key)
dict.m.Store(key, value)
if existed {<!-- -->
return 0
}
return 1
}

//PutIfAbsent if it does not exist, store the data
func (dict *SyncDict) PutIfAbsent(key string, value interface{<!-- -->}) (result int) {<!-- -->
_, existed := dict.m.Load(key)
if !existed {<!-- -->
dict.m.Store(key, value)
return 1
}
return 0
}

//PutIfExists inserts data if it exists
func (dict *SyncDict) PutIfExists(key string, value interface{<!-- -->}) (result int) {<!-- -->
_, existed := dict.m.Load(key)
if existed {<!-- -->
dict.m.Store(key, value)
return 1
}
return 0
}

//Remove remove data
func (dict *SyncDict) Remove(key string) (result int) {<!-- -->
_, existed := dict.m.Load(key)
dict.m.Delete(key)
if existed {<!-- -->
return 1
}
return 0
}

//ForEach traverses the data
func (dict *SyncDict) ForEach(consumer Consumer) {<!-- -->
dict.m.Range(func(key, value any) bool {<!-- -->
consumer(key.(string), value)
return true
})
}

//Keys get all keys
func (dict *SyncDict) Keys() []string {<!-- -->
//Create a slice whose length is the length of the current dictionary
keys := make([]string, dict. Len())
dict.m.Range(func(key, _ any) bool {<!-- -->
keys = append(keys, key.(string))
return true
})
return keys
}

//RandomKeys randomly returns the specified key value
func (dict *SyncDict) RandomKeys(limit int) []string {<!-- -->
keys := make([]string, limit)
//Traversing the number of times passed in, each entry into the dict only acts on one key, because each traversal of the map is unordered
for i := 0; i < limit; i + + {<!-- -->
dict.m.Range(func(key, _ any) bool {<!-- -->
keys[i] = key.(string)
//Directly return false to close the current traversal
return false
})
}

return keys
}

//RandomDistinctKeys randomly takes non-repeating values
func (dict *SyncDict) RandomDistinctKeys(limit int) []string {<!-- -->
keys := make([]string, limit)
i := 0
dict.m.Range(func(key, _ any) bool {<!-- -->
keys[i] = key.(string)
i + +
if i == limit {<!-- -->
//Directly return false to close the current traversal
return false
}
return true
})

return keys
}

//clear just replace it with a new one
func (dict *SyncDict) Clear() {<!-- -->
dict = MakeSyncDict()
}

func MakeSyncDict() *SyncDict {<!-- -->
return &SyncDict{<!-- -->}
}

3.2 DB

As the database storage layer supported by the bottom layer, DB wraps the Dict of data storage to provide external support

Field Description

  • index: The index of the database, each database will have a corresponding index id for switching marks
  • data: where data is stored, here we define a Dict structure for underlying data storage
  • addAof: an anonymous function defined for aof operation, which will be implemented later
//DB Each sub-database of redis
type DB struct {<!-- -->
// index of the database
index int
//Implementation of data storage
data dict. Dict
//Assign an addAof method, which allows data to be written when the instruction is executed
addAof func(line CmdLine)
}

//Close closes the database and clears it
func (db *DB) Close() {<!-- -->
db.data.Clear()
}

//AfterClientClose closes the database and needs to do
func (db *DB) AfterClientClose(c resp.Connection) {<!-- -->

}

//ExecFunc All subsequent instruction execution must implement the current function
type ExecFunc func(db *DB, args [][]byte) resp.Reply

//CmdLine takes an alias for the two-dimensional byte group
type CmdLine = [][]byte

//makeDB create database
func makeDB() *DB {<!-- -->
return &DB{<!-- -->
data: dict. MakeSyncDict(),
addAof: func(line CmdLine) {<!-- -->}, //Set an empty default method to prevent problems at the beginning of initialization
}
}

//Exec executes on the database
func (db *DB) Exec(c resp.Connection, args [][]byte) resp.Reply {<!-- -->
//Obtain a piece of data, the storage should be an instruction For example: setnx or set; here we unify and process according to the lowercase instruction
cmdName := strings. ToLower(string(args[0]))
//Get the executor
cmd, ok := cmdTable[cmdName]
if !ok {<!-- -->
// If this command is not implemented, recover an error
return reply. MakeStandardErrReply("ERR unknown command " + cmdName)
}
//Check whether the number of parameters is legal, for example: set k, value is missing
if !validateArity(cmd.arity, args) {<!-- -->
return reply. MakeArgNumErrReply(cmdName)
}
//Get the executor
fun := cmd.executor
//set k,v --> The parameters passed in here are k,v
return fun(db, args[1:])
}

//validateArity validation parameters, the parameters are divided into two situations: set K v --> arity = 3 ; if exists k1 k2 k3 ---> arity requires at least 2 parameters (belonging to variable-length instructions), unified Defined as -2, the minus sign executes the variable-length mark
func validateArity(arity int, cmdLine CmdLine) bool {<!-- -->
argNum := len(cmdLine)
//The command is fixed length
if arity >= 0 {<!-- -->
return argNum == arity
}
//Here, negative arity can convert the number of variable-length parameters into positive numbers for judgment, and specify the variable-length parameters when registering the command
return argNum >= -arity
}

//GetEntity public method to get data, db wraps dict with a layer
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {<!-- -->
// Get the original value
raw, ok := db. data. Get(key)
if !ok {<!-- -->
return nil, false
}
//The original format of raw needs to be converted to DataEntity
dataEntity, _ := raw.(*database.DataEntity)
return dataEntity, true
}

//PutEntity is stored in the entity object
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {<!-- -->
return db.data.Put(key, entity)
}

//PutIfExists is stored in the entity object, if it exists
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {<!-- -->
return db.data.PutIfExists(key, entity)
}

//PutIfAbsent If it does not exist, deposit it
func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {<!-- -->
return db.data.PutIfAbsent(key, entity)
}

//Remove remove
func (db *DB) Remove(key string) int {<!-- -->
return db.data.Remove(key)
}

//Removes batch removal
func (db *DB) Removes(keys ... string) int {<!-- -->
result := 0
for _, key := range keys {<!-- -->
_, ok := db.data.Get(key)
if ok {<!-- -->
result + = db. Remove(key)
}
}
return result
}

//Flush empty
func (db *DB) Flush() {<!-- -->
db.data.Clear()
}

3.3 Database

A Database structure defined in the database/database.go file mainly encapsulates DB

Field Description

  • dbSet: the created database, created according to the number of initialized databases in the configuration file
  • aofHandler: A custom AofHandler interface, which is subsequently used for message persistence
type Database struct {<!-- -->
// Store multiple databases
dbSet []*DB
//aof processor
aofHandler *aof.AofHandler
}

//NewDatabase creates a db database kernel, initializes data, and initializes according to the number of databases in the configuration file
func NewDatabase() *Database {<!-- -->
database := &Database{<!-- -->}
if config.Properties.Databases == 0 {<!-- -->
config.Properties.Databases = 16
}
//Initialize 16 databases db
database.dbSet = make([]*DB, config.Properties.Databases)
for i := range database.dbSet {<!-- -->
db := makeDB()
db.index = i
database.dbSet[i] = db
}
//Determine whether to enable the function of message placement
if config.Properties.AppendOnly {<!-- -->
aofHandler, err := aof.NewAofHandler(database)
if err != nil {<!-- -->
panic(err)
}
database.aofHandler = aofHandler
//The method of initializing the anonymous processor in db
for _, db := range database.dbSet {<!-- -->
tmpDb := db
db.addAof = func(line CmdLine) {<!-- -->
/**
Anonymous functions will have closure problems, if you use db.index will always be 15 index
*/
database.aofHandler.AddAof(tmpDb.index, line)
}
}
}
return database
}

//Exec kernel layer for execution
func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {<!-- -->
defer func() {<!-- -->
if err := recover(); err != nil {<!-- -->
logger. Error(err)
}
}()
//Need to process select 1 command to switch database, the following db core database does not need to process select
cmdName := strings. ToLower(string(args[0]))
if cmdName == "select" {<!-- -->
//select command
if len(args) != 2 {<!-- -->
return reply. MakeArgNumErrReply("select")
}
return execSelect(client, d, args)
}
dbIndex := client. GetDBIndex()
db := d.dbSet[dbIndex]
if db == nil {<!-- -->
return reply. MakeStandardErrReply("ERR DB is nil")
}
return db. Exec(client, args)
}

//Close closes the database
func (d *Database) Close() {<!-- -->
for _, db := range d.dbSet {<!-- -->
db. Close()
}
}

//AfterClientClose The callback that needs to be executed after closing the connection
func (d *Database) AfterClientClose(c resp. Connection) {<!-- -->

}

//execSelect Modify the index of the database according to the instruction sent by the user, for example: select 1
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {<!-- -->
// parameter is converted to a number
dbIndex, err := strconv. Atoi(string(args[1]))
if err != nil {<!-- -->
return reply. MakeStandardErrReply("ERR invalid DB index")
}
if dbIndex >= len(database.dbSet) {<!-- -->
return reply. MakeStandardErrReply("ERR DB index is out of range")
}
c. SelectDB(dbIndex)
return reply. MakeOkReply()
}

The above is the implementation of the database layer, mainly a hierarchical structure such as Database –> DB –> Dict. Database obtains the database in which the user needs to execute instructions, and then hands it to the corresponding index The DB executes, and the DB then goes back to the corresponding instruction executor to execute. The following is the implementation of the instruction

4. Instruction implementation

4.1 command

A command command structure is defined here, which is mainly used to wrap the function that executes the command

Parameter Description

  • executor : The executor function, which is defined in database/db.go is the function in the DB structure file defined above, and we only need to add The corresponding function can be easily retrieved by registering it in a map and associating with the command.
//ExecFunc All subsequent instruction execution must implement the current function
type ExecFunc func(db *DB, args [][]byte) resp.Reply
  • arity: The current command requires several parameters, for example: set key value, which requires 3 parameters; the arity here can be positive or negative, negative numbers represent variable length commands, for example: keys key1 key2, then the arity is -2 that is It is said that the command has at least two parameters

Method Description

  • RegisterCommand: Provides a public method to register the executed function into the map and associate it with the command
//used to record the relationship between all instructions and command
var cmdTable = make(map[string]*command)

//command is used to distinguish the type and execution mode of the instruction
type command struct {<!-- -->
\t//Actuator
executor ExecFunc
//requires several parameters
arity int
}

//RegisterCommand registers the execution method of the instruction, when it is stored, one instruction corresponds to one executor
func RegisterCommand(name string, executor ExecFunc, arity int) {<!-- -->
name = strings. ToLower(name)
cmdTable[name] = &command{<!-- -->
executor: executor,
arity: arity,
}
}

4.2 keys instruction set

In database/keys.go, we define the implementation of the keys related instruction set, where the method name is defined freely, mainly in the following init() method for registration, When go starts, it will automatically execute the init() method in each file, so that the executed functions and commands can be associated with each other. Each method needs two parameters *DB and args [][]bytes, the above said DB is for Dict< /strong> is encapsulated, and Dict is where the data is actually stored, so the command execution method only needs to call the corresponding method in DB

/**
Implement the following keys instruction set:
DEL
EXISTS
KEYS
FLUSHDB
TYPE
RENAME
RENAMEX
*/

//execDel DEL k1 k2 k3, DEL has been cut off outside
func execDel(db *DB, args [][]byte) resp.Reply {<!-- -->
keys := make([]string, len(args))
for i, v := range keys {<!-- -->
keys[i] = string(v)
}
deleted := db. Removes(keys...)
if deleted > 0 {<!-- -->
//The previous command was cut off, it needs to be restored here
db.addAof(utils.ToCmdLine2("del", args...))
}
return reply. MakeIntReply(int64(deleted))
}

//execExists Exists exist several keys
func execExists(db *DB, args [][]byte) resp.Reply {<!-- -->
keys := make([]string, len(args))
result := int64(0)
for _, v := range keys {<!-- -->
_, exists := db. GetEntity(v)
if exists {<!-- -->
result ++
}
}
return reply. MakeIntReply(result)
}

//execKeys keys k1 k2 k3
func execKeys(db *DB, args [][]byte) resp.Reply {<!-- -->
// Get whether the first parameter is a wildcard
pattern := wildcard. CompilePattern(string(args[0]))
// for all matching keys
result := make([][]byte, 0)
db.data.ForEach(func(key string, val interface{<!-- -->}) bool {<!-- -->
match := pattern.IsMatch(key)
if match {<!-- -->
result = append(result, []byte(key))
}
return true
})
return reply. MakeMultiBulkReply(result)
}

//execType query key type, for example: type key1
func execType(db *DB, args [][]byte) resp.Reply {<!-- -->
key := string(args[0])
entity, ok := db. GetEntity(key)
if !ok {<!-- -->
return reply.MakeStatusReply("none") //Inside the tcp message is :none\r\

}
// type assertion
switch entity.Data.(type) {<!-- -->
case []byte:
return reply. MakeStatusReply("string")
//todo: judge other subsequent types
}
return reply. MakeUnknownErrReply()
}

//execRename rename, for example: rename old newKey
func execRename(db *DB, args [][]byte) resp.Reply {<!-- -->
old := string(args[0])
newKey := string(args[1])
entity, exists := db. GetEntity(old)
if !exists {<!-- -->
return reply. MakeStandardErrReply("no such key")
}
// store the new key in
db.PutEntity(newKey, entity)
//delete the old key
db. Remove(old)
db.addAof(utils.ToCmdLine2("rename", args...))
return reply. MakeOkReply()
}

//execRenameNx renamenx: When changing to a new name, judge whether the new name will kill the existing key, for example: renamenx K1 K2, you need to judge whether the original K2 exists
func execRenameNx(db *DB, args [][]byte) resp.Reply {<!-- -->
old := string(args[0])
newKey := string(args[1])
//Judge whether the new key exists, if it exists, do nothing
_, ok := db. GetEntity(newKey)
if ok {<!-- -->
// Return 0 if nothing is done
return reply. MakeIntReply(0)
}
//Continue to judge the original logic
entity, exists := db. GetEntity(old)
if !exists {<!-- -->
return reply. MakeStandardErrReply("no such key")
}
// store the new key in
db.PutEntity(newKey, entity)
//delete the old key
db. Remove(old)
db.addAof(utils.ToCmdLine2("renamenx", args...))
//If executed, return a 1
return reply. MakeIntReply(1)
}

//execFlushDb
func execFlushDb(db *DB, args [][]byte) resp.Reply {<!-- -->
db. Flush()
db.addAof(utils.ToCmdLine2("flushdb", args...))
return reply. MakeOkReply()
}

//init initialization registration command
func init() {<!-- -->
RegisterCommand("del", execDel, -2)
RegisterCommand("exists", execExists, -2)
RegisterCommand("flushdb", execFlushDb, -1) //-1 parameter is ignored regardless of what follows flushdb
RegisterCommand("type", execType, 2)
RegisterCommand("rename", execRename, 3)
RegisterCommand("renamenx", execRenameNx, 3)
RegisterCommand("keys", execKeys, -2)
}

4.3 string instruction set

The string command set is defined in database\string.go and is implemented in the same way as the keys set above

/**
Implement the instruction set of string type
GET
SET
SETNX
GETSET
STRLEN
*/

//init The go language will execute the init method when it starts
func init() {<!-- -->
RegisterCommand("get", execGet, 2)
RegisterCommand("set", execSet, 3)
RegisterCommand("setnx", execSetnx, 3)
RegisterCommand("getset", execGetset, 3)
RegisterCommand("strlen", execStrLen, 2)
}

//execGet get data
func execGet(db *DB, args [][]byte) resp.Reply {<!-- -->
key := string(args[0])
entity, ok := db. GetEntity(key)
if !ok {<!-- -->
return reply. MakeNullBulkReply()
}
//Currently stored data is stored in []byte
bytes, b := entity.Data.([]byte)
//If there are other stored types, it is necessary to judge whether the conversion is successful
if !b {<!-- -->
//todo: convert other types
}
return reply. MakeBulkReply(bytes)
}

//execSet set key value
func execSet(db *DB, args [][]byte) resp.Reply {<!-- -->
key := string(args[0])
//Store data in byte array
value := args[1]
data := &database.DataEntity{<!-- -->
Data: value, //Data storage is stored in the form of byte array
}
result := db.PutEntity(key, data)
db.addAof(utils.ToCmdLine2("set", args...))
return reply. MakeIntReply(int64(result))
}

//execSetnx setnx key value
func execSetnx(db *DB, args [][]byte) resp.Reply {<!-- -->
key := string(args[0])
//Store data in byte array
value := args[1]
data := &database.DataEntity{<!-- -->
Data: value, //Data storage is stored in the form of byte array
}
result := db.PutIfAbsent(key, data)
db.addAof(utils.ToCmdLine2("setnx", args...))
return reply. MakeIntReply(int64(result))
}

//execGetset getset key value
func execGetset(db *DB, args [][]byte) resp.Reply {<!-- -->
key := string(args[0])
value := args[1]
// Get the original value first, then set the current value
entity, exists := db. GetEntity(key)
db.PutEntity(key, & amp;database.DataEntity{<!-- -->Data: value})
if !exists {<!-- -->
return reply. MakeNullBulkReply()
}
db.addAof(utils.ToCmdLine2("getset", args...))
return reply.MakeBulkReply(entity.Data.([]byte))
}

//execStrLen strlen key The value length of the key obtained
func execStrLen(db *DB, args [][]byte) resp.Reply {<!-- -->
key := string(args[0])
// Get the original value first, then set the current value
entity, exists := db. GetEntity(key)
if !exists {<!-- -->
return reply. MakeNullBulkReply()
}
return reply.MakeIntReply(int64(len(entity.Data.([]byte))))
}

4.4 ping command

The ping command is relatively simple to implement

//Ping ping command
func Ping(db *DB, args [][]byte) resp.Reply {<!-- -->
return reply. MakePongReply()
}

//init is written under any package, and the go language will call this method when it starts
func init() {<!-- -->
RegisterCommand("ping", Ping, 1)
}
syntaxbug.com © 2021 All Rights Reserved.