Database core layer
The logic of the processor has been implemented before, and now the core data layer has been implemented. The core database is mainly used to execute instructions sent by users and store data
1. Database
The top-level interface definition of the data layer is defined in the interface/database/database.go file,
It defines the Database interface and the DataEntity structure for packaging data, where the interface type means that any type of data can be stored
//Database The business layer of the database type Database interface {<!-- --> //Exec The core layer executes the command and returns a response body Exec(client resp.Connection, args [][]byte) resp.Reply //Close off close() //AfterClientClose Some operations that need to be done after the client is closed AfterClientClose(c resp. Connection) } //DataEntity data structure, used to wrap any data type type DataEntity struct {<!-- --> //Data can refer to any data Data interface{<!-- -->} }
2. Dict
The data structure of storing dictionaries in redis, we will implement this interface to store data, define the interface in datastruct/dict/dict.go
/** Dict data structure for storing dictionaries in redis */ type Dict interface {<!-- --> //Get returns the value value, exist represents whether it exists Get(key string) (val interface{<!-- -->}, exist bool) //Len The data in the dictionary Len() int //Put store data, return the number of stored Put(key string, value interface{<!-- -->}) (result int) //PutIfAbsent is set if it does not exist PutIfAbsent(key string, value interface{<!-- -->}) (result int) //PutIfExists is set if it exists, and it is not set if it does not exist PutIfExists(key string, value interface{<!-- -->}) (result int) //Remove remove Remove(key string) (result int) //ForEach traversal method ForEach(consumer Consumer) //Keys returns all keys Keys() []string //RandomKeys how many keys need to be returned randomly RandomKeys(limit int) []string //RandomDistinctKeys returns the specified number of unique keys RandomDistinctKeys(limit int) []string //Clear clears the dictionary table clear() } //Consumer custom traversal function type Consumer func(key string, val interface{<!-- -->}) bool
3. Implementation class
3.1 SyncDict
Implement the Dict interface. Where data is normally stored, we use sync.Map to store data thread-safely
//SyncDict uses a concurrent safe dictionary, why do you need to wrap a layer, because the underlying data interface may be replaced type SyncDict struct {<!-- --> //m concurrently safe map m sync.Map } //Get get method func (dict *SyncDict) Get(key string) (val interface{<!-- -->}, exist bool) {<!-- --> return dict.m.Load(key) } //Len uses the range method of map by default to traverse the number func (dict *SyncDict) Len() int {<!-- --> length := 0 dict.m.Range(func(key, value any) bool {<!-- --> length + + return true }) return length } //Put to store data func (dict *SyncDict) Put(key string, value interface{<!-- -->}) (result int) {<!-- --> _, existed := dict.m.Load(key) dict.m.Store(key, value) if existed {<!-- --> return 0 } return 1 } //PutIfAbsent if it does not exist, store the data func (dict *SyncDict) PutIfAbsent(key string, value interface{<!-- -->}) (result int) {<!-- --> _, existed := dict.m.Load(key) if !existed {<!-- --> dict.m.Store(key, value) return 1 } return 0 } //PutIfExists inserts data if it exists func (dict *SyncDict) PutIfExists(key string, value interface{<!-- -->}) (result int) {<!-- --> _, existed := dict.m.Load(key) if existed {<!-- --> dict.m.Store(key, value) return 1 } return 0 } //Remove remove data func (dict *SyncDict) Remove(key string) (result int) {<!-- --> _, existed := dict.m.Load(key) dict.m.Delete(key) if existed {<!-- --> return 1 } return 0 } //ForEach traverses the data func (dict *SyncDict) ForEach(consumer Consumer) {<!-- --> dict.m.Range(func(key, value any) bool {<!-- --> consumer(key.(string), value) return true }) } //Keys get all keys func (dict *SyncDict) Keys() []string {<!-- --> //Create a slice whose length is the length of the current dictionary keys := make([]string, dict. Len()) dict.m.Range(func(key, _ any) bool {<!-- --> keys = append(keys, key.(string)) return true }) return keys } //RandomKeys randomly returns the specified key value func (dict *SyncDict) RandomKeys(limit int) []string {<!-- --> keys := make([]string, limit) //Traversing the number of times passed in, each entry into the dict only acts on one key, because each traversal of the map is unordered for i := 0; i < limit; i + + {<!-- --> dict.m.Range(func(key, _ any) bool {<!-- --> keys[i] = key.(string) //Directly return false to close the current traversal return false }) } return keys } //RandomDistinctKeys randomly takes non-repeating values func (dict *SyncDict) RandomDistinctKeys(limit int) []string {<!-- --> keys := make([]string, limit) i := 0 dict.m.Range(func(key, _ any) bool {<!-- --> keys[i] = key.(string) i + + if i == limit {<!-- --> //Directly return false to close the current traversal return false } return true }) return keys } //clear just replace it with a new one func (dict *SyncDict) Clear() {<!-- --> dict = MakeSyncDict() } func MakeSyncDict() *SyncDict {<!-- --> return &SyncDict{<!-- -->} }
3.2 DB
As the database storage layer supported by the bottom layer, DB wraps the Dict of data storage to provide external support
Field Description
- index: The index of the database, each database will have a corresponding index id for switching marks
- data: where data is stored, here we define a Dict structure for underlying data storage
- addAof: an anonymous function defined for aof operation, which will be implemented later
//DB Each sub-database of redis type DB struct {<!-- --> // index of the database index int //Implementation of data storage data dict. Dict //Assign an addAof method, which allows data to be written when the instruction is executed addAof func(line CmdLine) } //Close closes the database and clears it func (db *DB) Close() {<!-- --> db.data.Clear() } //AfterClientClose closes the database and needs to do func (db *DB) AfterClientClose(c resp.Connection) {<!-- --> } //ExecFunc All subsequent instruction execution must implement the current function type ExecFunc func(db *DB, args [][]byte) resp.Reply //CmdLine takes an alias for the two-dimensional byte group type CmdLine = [][]byte //makeDB create database func makeDB() *DB {<!-- --> return &DB{<!-- --> data: dict. MakeSyncDict(), addAof: func(line CmdLine) {<!-- -->}, //Set an empty default method to prevent problems at the beginning of initialization } } //Exec executes on the database func (db *DB) Exec(c resp.Connection, args [][]byte) resp.Reply {<!-- --> //Obtain a piece of data, the storage should be an instruction For example: setnx or set; here we unify and process according to the lowercase instruction cmdName := strings. ToLower(string(args[0])) //Get the executor cmd, ok := cmdTable[cmdName] if !ok {<!-- --> // If this command is not implemented, recover an error return reply. MakeStandardErrReply("ERR unknown command " + cmdName) } //Check whether the number of parameters is legal, for example: set k, value is missing if !validateArity(cmd.arity, args) {<!-- --> return reply. MakeArgNumErrReply(cmdName) } //Get the executor fun := cmd.executor //set k,v --> The parameters passed in here are k,v return fun(db, args[1:]) } //validateArity validation parameters, the parameters are divided into two situations: set K v --> arity = 3 ; if exists k1 k2 k3 ---> arity requires at least 2 parameters (belonging to variable-length instructions), unified Defined as -2, the minus sign executes the variable-length mark func validateArity(arity int, cmdLine CmdLine) bool {<!-- --> argNum := len(cmdLine) //The command is fixed length if arity >= 0 {<!-- --> return argNum == arity } //Here, negative arity can convert the number of variable-length parameters into positive numbers for judgment, and specify the variable-length parameters when registering the command return argNum >= -arity } //GetEntity public method to get data, db wraps dict with a layer func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {<!-- --> // Get the original value raw, ok := db. data. Get(key) if !ok {<!-- --> return nil, false } //The original format of raw needs to be converted to DataEntity dataEntity, _ := raw.(*database.DataEntity) return dataEntity, true } //PutEntity is stored in the entity object func (db *DB) PutEntity(key string, entity *database.DataEntity) int {<!-- --> return db.data.Put(key, entity) } //PutIfExists is stored in the entity object, if it exists func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {<!-- --> return db.data.PutIfExists(key, entity) } //PutIfAbsent If it does not exist, deposit it func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {<!-- --> return db.data.PutIfAbsent(key, entity) } //Remove remove func (db *DB) Remove(key string) int {<!-- --> return db.data.Remove(key) } //Removes batch removal func (db *DB) Removes(keys ... string) int {<!-- --> result := 0 for _, key := range keys {<!-- --> _, ok := db.data.Get(key) if ok {<!-- --> result + = db. Remove(key) } } return result } //Flush empty func (db *DB) Flush() {<!-- --> db.data.Clear() }
3.3 Database
A Database structure defined in the database/database.go file mainly encapsulates DB
Field Description
- dbSet: the created database, created according to the number of initialized databases in the configuration file
- aofHandler: A custom AofHandler interface, which is subsequently used for message persistence
type Database struct {<!-- --> // Store multiple databases dbSet []*DB //aof processor aofHandler *aof.AofHandler } //NewDatabase creates a db database kernel, initializes data, and initializes according to the number of databases in the configuration file func NewDatabase() *Database {<!-- --> database := &Database{<!-- -->} if config.Properties.Databases == 0 {<!-- --> config.Properties.Databases = 16 } //Initialize 16 databases db database.dbSet = make([]*DB, config.Properties.Databases) for i := range database.dbSet {<!-- --> db := makeDB() db.index = i database.dbSet[i] = db } //Determine whether to enable the function of message placement if config.Properties.AppendOnly {<!-- --> aofHandler, err := aof.NewAofHandler(database) if err != nil {<!-- --> panic(err) } database.aofHandler = aofHandler //The method of initializing the anonymous processor in db for _, db := range database.dbSet {<!-- --> tmpDb := db db.addAof = func(line CmdLine) {<!-- --> /** Anonymous functions will have closure problems, if you use db.index will always be 15 index */ database.aofHandler.AddAof(tmpDb.index, line) } } } return database } //Exec kernel layer for execution func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {<!-- --> defer func() {<!-- --> if err := recover(); err != nil {<!-- --> logger. Error(err) } }() //Need to process select 1 command to switch database, the following db core database does not need to process select cmdName := strings. ToLower(string(args[0])) if cmdName == "select" {<!-- --> //select command if len(args) != 2 {<!-- --> return reply. MakeArgNumErrReply("select") } return execSelect(client, d, args) } dbIndex := client. GetDBIndex() db := d.dbSet[dbIndex] if db == nil {<!-- --> return reply. MakeStandardErrReply("ERR DB is nil") } return db. Exec(client, args) } //Close closes the database func (d *Database) Close() {<!-- --> for _, db := range d.dbSet {<!-- --> db. Close() } } //AfterClientClose The callback that needs to be executed after closing the connection func (d *Database) AfterClientClose(c resp. Connection) {<!-- --> } //execSelect Modify the index of the database according to the instruction sent by the user, for example: select 1 func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {<!-- --> // parameter is converted to a number dbIndex, err := strconv. Atoi(string(args[1])) if err != nil {<!-- --> return reply. MakeStandardErrReply("ERR invalid DB index") } if dbIndex >= len(database.dbSet) {<!-- --> return reply. MakeStandardErrReply("ERR DB index is out of range") } c. SelectDB(dbIndex) return reply. MakeOkReply() }
The above is the implementation of the database layer, mainly a hierarchical structure such as Database –> DB –> Dict. Database obtains the database in which the user needs to execute instructions, and then hands it to the corresponding index The DB executes, and the DB then goes back to the corresponding instruction executor to execute. The following is the implementation of the instruction
4. Instruction implementation
4.1 command
A command command structure is defined here, which is mainly used to wrap the function that executes the command
Parameter Description
- executor : The executor function, which is defined in database/db.go is the function in the DB structure file defined above, and we only need to add The corresponding function can be easily retrieved by registering it in a map and associating with the command.
//ExecFunc All subsequent instruction execution must implement the current function type ExecFunc func(db *DB, args [][]byte) resp.Reply
- arity: The current command requires several parameters, for example: set key value, which requires 3 parameters; the arity here can be positive or negative, negative numbers represent variable length commands, for example: keys key1 key2, then the arity is -2 that is It is said that the command has at least two parameters
Method Description
- RegisterCommand: Provides a public method to register the executed function into the map and associate it with the command
//used to record the relationship between all instructions and command var cmdTable = make(map[string]*command) //command is used to distinguish the type and execution mode of the instruction type command struct {<!-- --> \t//Actuator executor ExecFunc //requires several parameters arity int } //RegisterCommand registers the execution method of the instruction, when it is stored, one instruction corresponds to one executor func RegisterCommand(name string, executor ExecFunc, arity int) {<!-- --> name = strings. ToLower(name) cmdTable[name] = &command{<!-- --> executor: executor, arity: arity, } }
4.2 keys instruction set
In database/keys.go, we define the implementation of the keys related instruction set, where the method name is defined freely, mainly in the following init() method for registration, When go starts, it will automatically execute the init() method in each file, so that the executed functions and commands can be associated with each other. Each method needs two parameters *DB and args [][]bytes, the above said DB is for Dict< /strong> is encapsulated, and Dict is where the data is actually stored, so the command execution method only needs to call the corresponding method in DB
/** Implement the following keys instruction set: DEL EXISTS KEYS FLUSHDB TYPE RENAME RENAMEX */ //execDel DEL k1 k2 k3, DEL has been cut off outside func execDel(db *DB, args [][]byte) resp.Reply {<!-- --> keys := make([]string, len(args)) for i, v := range keys {<!-- --> keys[i] = string(v) } deleted := db. Removes(keys...) if deleted > 0 {<!-- --> //The previous command was cut off, it needs to be restored here db.addAof(utils.ToCmdLine2("del", args...)) } return reply. MakeIntReply(int64(deleted)) } //execExists Exists exist several keys func execExists(db *DB, args [][]byte) resp.Reply {<!-- --> keys := make([]string, len(args)) result := int64(0) for _, v := range keys {<!-- --> _, exists := db. GetEntity(v) if exists {<!-- --> result ++ } } return reply. MakeIntReply(result) } //execKeys keys k1 k2 k3 func execKeys(db *DB, args [][]byte) resp.Reply {<!-- --> // Get whether the first parameter is a wildcard pattern := wildcard. CompilePattern(string(args[0])) // for all matching keys result := make([][]byte, 0) db.data.ForEach(func(key string, val interface{<!-- -->}) bool {<!-- --> match := pattern.IsMatch(key) if match {<!-- --> result = append(result, []byte(key)) } return true }) return reply. MakeMultiBulkReply(result) } //execType query key type, for example: type key1 func execType(db *DB, args [][]byte) resp.Reply {<!-- --> key := string(args[0]) entity, ok := db. GetEntity(key) if !ok {<!-- --> return reply.MakeStatusReply("none") //Inside the tcp message is :none\r\ } // type assertion switch entity.Data.(type) {<!-- --> case []byte: return reply. MakeStatusReply("string") //todo: judge other subsequent types } return reply. MakeUnknownErrReply() } //execRename rename, for example: rename old newKey func execRename(db *DB, args [][]byte) resp.Reply {<!-- --> old := string(args[0]) newKey := string(args[1]) entity, exists := db. GetEntity(old) if !exists {<!-- --> return reply. MakeStandardErrReply("no such key") } // store the new key in db.PutEntity(newKey, entity) //delete the old key db. Remove(old) db.addAof(utils.ToCmdLine2("rename", args...)) return reply. MakeOkReply() } //execRenameNx renamenx: When changing to a new name, judge whether the new name will kill the existing key, for example: renamenx K1 K2, you need to judge whether the original K2 exists func execRenameNx(db *DB, args [][]byte) resp.Reply {<!-- --> old := string(args[0]) newKey := string(args[1]) //Judge whether the new key exists, if it exists, do nothing _, ok := db. GetEntity(newKey) if ok {<!-- --> // Return 0 if nothing is done return reply. MakeIntReply(0) } //Continue to judge the original logic entity, exists := db. GetEntity(old) if !exists {<!-- --> return reply. MakeStandardErrReply("no such key") } // store the new key in db.PutEntity(newKey, entity) //delete the old key db. Remove(old) db.addAof(utils.ToCmdLine2("renamenx", args...)) //If executed, return a 1 return reply. MakeIntReply(1) } //execFlushDb func execFlushDb(db *DB, args [][]byte) resp.Reply {<!-- --> db. Flush() db.addAof(utils.ToCmdLine2("flushdb", args...)) return reply. MakeOkReply() } //init initialization registration command func init() {<!-- --> RegisterCommand("del", execDel, -2) RegisterCommand("exists", execExists, -2) RegisterCommand("flushdb", execFlushDb, -1) //-1 parameter is ignored regardless of what follows flushdb RegisterCommand("type", execType, 2) RegisterCommand("rename", execRename, 3) RegisterCommand("renamenx", execRenameNx, 3) RegisterCommand("keys", execKeys, -2) }
4.3 string instruction set
The string command set is defined in database\string.go and is implemented in the same way as the keys set above
/** Implement the instruction set of string type GET SET SETNX GETSET STRLEN */ //init The go language will execute the init method when it starts func init() {<!-- --> RegisterCommand("get", execGet, 2) RegisterCommand("set", execSet, 3) RegisterCommand("setnx", execSetnx, 3) RegisterCommand("getset", execGetset, 3) RegisterCommand("strlen", execStrLen, 2) } //execGet get data func execGet(db *DB, args [][]byte) resp.Reply {<!-- --> key := string(args[0]) entity, ok := db. GetEntity(key) if !ok {<!-- --> return reply. MakeNullBulkReply() } //Currently stored data is stored in []byte bytes, b := entity.Data.([]byte) //If there are other stored types, it is necessary to judge whether the conversion is successful if !b {<!-- --> //todo: convert other types } return reply. MakeBulkReply(bytes) } //execSet set key value func execSet(db *DB, args [][]byte) resp.Reply {<!-- --> key := string(args[0]) //Store data in byte array value := args[1] data := &database.DataEntity{<!-- --> Data: value, //Data storage is stored in the form of byte array } result := db.PutEntity(key, data) db.addAof(utils.ToCmdLine2("set", args...)) return reply. MakeIntReply(int64(result)) } //execSetnx setnx key value func execSetnx(db *DB, args [][]byte) resp.Reply {<!-- --> key := string(args[0]) //Store data in byte array value := args[1] data := &database.DataEntity{<!-- --> Data: value, //Data storage is stored in the form of byte array } result := db.PutIfAbsent(key, data) db.addAof(utils.ToCmdLine2("setnx", args...)) return reply. MakeIntReply(int64(result)) } //execGetset getset key value func execGetset(db *DB, args [][]byte) resp.Reply {<!-- --> key := string(args[0]) value := args[1] // Get the original value first, then set the current value entity, exists := db. GetEntity(key) db.PutEntity(key, & amp;database.DataEntity{<!-- -->Data: value}) if !exists {<!-- --> return reply. MakeNullBulkReply() } db.addAof(utils.ToCmdLine2("getset", args...)) return reply.MakeBulkReply(entity.Data.([]byte)) } //execStrLen strlen key The value length of the key obtained func execStrLen(db *DB, args [][]byte) resp.Reply {<!-- --> key := string(args[0]) // Get the original value first, then set the current value entity, exists := db. GetEntity(key) if !exists {<!-- --> return reply. MakeNullBulkReply() } return reply.MakeIntReply(int64(len(entity.Data.([]byte)))) }
4.4 ping command
The ping command is relatively simple to implement
//Ping ping command func Ping(db *DB, args [][]byte) resp.Reply {<!-- --> return reply. MakePongReply() } //init is written under any package, and the go language will call this method when it starts func init() {<!-- --> RegisterCommand("ping", Ping, 1) }