[HyperLedger Fabric source code interpretation] orderer

Sorting service startup process

The ordering service node is started through the main() method of cmd/orderer/main.go, which will further call the orderer/common/server/main.go#Main() method.

# fabric-release-2.2\cmd\orderer\main.go
package main

import "github.com/hyperledger/fabric/orderer/common/server"

func main() {<!-- -->
server. Main()
}
  1. read local configuration
    The server.Main() main function first calls the config.Load() function, loads the orderer.yaml configuration file in the specified directory, parses and obtains the Orderer configuration information, completes the relative file path in the configuration with an absolute path, and fills in unspecified The configuration item is the default value, which is saved in the Orderer configuration object conf (TopLevel type).
# orderer/common/server/main.go file
 func Main() {<!-- -->
     //Parse the user command line
fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

// "version" command
if fullCmd == version.FullCommand() {<!-- -->
fmt.Println(metadata.GetVersionInfo())
return
}

//1. Load the local orderer.yaml configuration file
conf, err := localconfig.Load()
if err != nil {<!-- -->
logger.Error("failed to parse config: ", err)
os. Exit(1)
}
···
# orderer/common/localconfig/config.go file
// Load parses the orderer YAML file and environment, producing
// a struct suitable for config use, returning error on failure.
func Load() (*TopLevel, error) {<!-- -->
return cache.load()
}

// configCache stores marshalled bytes of config structures that produced from
// EnhancedExactUnmarshal. Cache key is the path of the configuration file that was used.
type configCache struct {<!-- -->
mutex sync.Mutex
cache map[string][]byte
}

var cache = &configCache{<!-- -->}

// Load will load the configuration and cache it on the first call;
// calls will return a clone of the configuration that was previously loaded.
func (c *configCache) load() (*TopLevel, error) {<!-- -->
var uconf TopLevel

config := viper. New()
coreconfig.InitViper(config, "orderer")
config. SetEnvPrefix(Prefix)
config. AutomaticEnv()
replacer := strings. NewReplacer(".", "_")
config. SetEnvKeyReplacer(replacer)

if err := config.ReadInConfig(); err != nil {<!-- -->
return nil, fmt.Errorf("Error reading configuration: %s", err)
}

c.mutex.Lock()
defer c.mutex.Unlock()
serializedConf, ok := c.cache[config.ConfigFileUsed()]
if !ok {<!-- -->
err := viperutil.EnhancedExactUnmarshal(config, &uconf)
if err != nil {<!-- -->
return nil, fmt.Errorf("Error unmarshaling config into struct: %s", err)
}

serializedConf, err = json. Marshal(uconf)
if err != nil {<!-- -->
return nil, err
}

if c.cache == nil {<!-- -->
c.cache = map[string][]byte{<!-- -->}
}
c.cache[config.ConfigFileUsed()] = serializedConf
}

err := json. Unmarshal(serializedConf, &uconf)
if err != nil {<!-- -->
return nil, err
}
uconf.completeInitialization(filepath.Dir(config.ConfigFileUsed()))

return & uconf, nil
}
  1. Read environment variable configuration, initialize log
    Read the environment variables FABRIC_LOGGING_SPEC and FABRIC_LOGGING_FORMAT to initialize the log module data structure. Note that the log-related configuration information is no longer included in the configuration file.
# orderer/common/server/main.go file
//2. Read the log environment variables FABRIC_LOGGING_SPEC and FABRIC_LOGGING_FORMAT, initialize the log data structure
initializeLogging()
# orderer/common/server/main.go file

func initializeLogging() {<!-- -->
loggingSpec := os. Getenv("FABRIC_LOGGING_SPEC")
loggingFormat := os. Getenv("FABRIC_LOGGING_FORMAT")
flogging.Init(flogging.Config{<!-- -->
Format: loggingFormat,
Writer: os.Stderr,
LogSpec: loggingSpec,
})
}
  1. Create and start the operation and maintenance support service
    Create and start the operation and maintenance support service The operation and maintenance support service allows the administrator to obtain the runtime status information of the node through the RESTful API, mainly through the operations.System structure. The main code is as follows:
# orderer/common/server/main.go file
// 3. Create and start the operation and maintenance support service
opsSystem := newOperationsSystem(conf. Operations, conf. Metrics)
metricsProvider := opsSystem.Provider
logObserver := floggingmetrics. NewObserver(metricsProvider)
flogging. SetObserver(logObserver)
// initializeServerConfig initializes gprc server configuration
serverConfig := initializeServerConfig(conf, metricsProvider)
// initializeGrpcServer initializes the gprc server instance
grpcServer := initializeGrpcServer(conf, serverConfig)
caMgr := &caManager{<!-- -->
appRootCAsByChain: make(map[string][][]byte),
ordererRootCAsByChain: make(map[string][][]byte),
clientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}
# orderer/common/server/main.go file
//The above newOperationsSystem related code
func newOperationsSystem(ops localconfig.Operations, metrics localconfig.Metrics) *operations.System {<!-- -->
return operations.NewSystem(operations.Options{<!-- -->
Logger: flogging.MustGetLogger("orderer.operations"),
ListenAddress: ops.ListenAddress,
Metrics: operations. MetricsOptions{<!-- -->
Provider: metrics. Provider,
Statsd: &operations.Statsd{<!-- -->
Network: metrics.Statsd.Network,
Address: metrics.Statsd.Address,
WriteInterval: metrics.Statsd.WriteInterval,
Prefix: metrics.Statsd.Prefix,
},
},
TLS: operations.TLS{<!-- -->
Enabled: ops.TLS.Enabled,
CertFile: ops.TLS.Certificate,
KeyFile: ops.TLS.PrivateKey,
ClientCertRequired: ops.TLS.ClientAuthRequired,
ClientCACertFiles: ops.TLS.ClientRootCAs,
},
Version: metadata. Version,
})
}
  1. Create ledger factory structure
    Mainly implement the following methods. This method will create a corresponding structure according to the ledger type specified in the configuration. Currently, only file-type ledger factories are supported. It mainly calls the corresponding methods in the orderer\common\server\util.go package to generate factory structures, including the structure for operating local files and the structure for reading and writing ledgers. In addition, index information is generated according to the block number.
// 4 Initialize the ledger factory structure
lf, _, err := createLedgerFactory(conf, metricsProvider)
if err != nil {<!-- -->
logger.Panicf("Failed to create ledger factory: %v", err)
}
//orderer\common\server\util.go
func createLedgerFactory(conf *config.TopLevel, metricsProvider metrics.Provider) (blockledger.Factory, string, error) {<!-- -->
ld := conf.FileLedger.Location
var err error
if ld == "" {<!-- -->
if ld, err = ioutil.TempDir("", conf.FileLedger.Prefix); err != nil {<!-- -->
logger.Panic("Error creating temp dir:", err)
}
}

logger. Debug("Ledger dir:", ld)
lf, err := fileledger. New(ld, metricsProvider)
if err != nil {<!-- -->
return nil, "", errors.WithMessage(err, "Error in opening ledger factory")
}
return lf, ld, nil
}
  1. Load the startup block and try to synchronize the local ledger
    When the Orderer service starts, it needs to read the relevant configuration of the network from the startup block and verify it, such as whether it includes alliance information, etc., and then check whether it needs to synchronize the ledger.
// 5 If a startup block is specified, parse and perform the necessary startup (load the startup block and synchronize the ledger)
var bootstrapBlock *cb.Block
if conf.General.BootstrapMethod == "file" {<!-- -->
bootstrapBlock = file.New(conf.General.BootstrapFile).GenesisBlock()
if err := onboarding.ValidateBootstrapBlock(bootstrapBlock, cryptoProvider); err != nil {<!-- -->
logger.Panicf("Failed validating bootstrap block: %v", err)
}

// Are we bootstrapping with a genesis block (i.e. bootstrap block number = 0)?
// If yes, generate the system channel with a genesis block.
if len(lf.ChannelIDs()) == 0 & amp; & amp; bootstrapBlock.Header.Number == 0 {<!-- -->
logger.Info("Bootstrapping the system channel")
//Initialize the source code of the system channel
initializeBootstrapChannel(bootstrapBlock, lf)
} else if len(lf. ChannelIDs()) > 0 {<!-- -->
logger.Info("Not bootstrapping the system channel because of existing channels")
} else {<!-- -->
logger.Infof("Not bootstrapping the system channel because the bootstrap block number is %d (>0), replication is needed", bootstrapBlock.Header.Number)
}
} else if conf.General.BootstrapMethod != "none" {<!-- -->
logger.Panicf("Unknown bootstrap method: %s", conf.General.BootstrapMethod)
}

  1. Initialize the Registranr structure responsible for consensus:
    • Create a system chain (if it is the first boot);
    • Create a consensus device (kafka.etcdraft)
// 6 Initialize the channel manager
//initializeMultichannelRegistrar initializes the multichannel registration manager registrar object
manager := initializeMultichannelRegistrar(
clusterBootBlock,
repInitiator,
clusterDialer,
clusterServerConfig,
clusterGRPCServer,
conf,
signer,
metricsProvider,
opsSystem,
lf,
cryptoProvider,
tlsCallback,
)
  1. Initialize the gRPC service structure, complete the binding and start listening. The main logic is as follows:
  • Use register, deliver, broadeast’s processor, metering service, callback, etc. to create an external main service structure
  • Atomic Broadcast Server
  • Bind AtomicBroadcastServer to the gRPC server and start listening

Broadcast call

Consensus

The channel consensus component chain object on the Orderer sorting server uses the Golang channel (Solo consensus component) or Kafka cluster (Kafka consensus component) as the consensus sorting backend to sort the legal transaction messages filtered by the channel message processor, and to sort the transaction order, etc. reach consensus.

Implementation process

  1. The Orderer sorting server receives transaction broadcast service requests based on the Broadcast() interface, calls the Handle() method of the Broadcast service processing handle for processing, establishes a message processing cycle, and receives and processes request messages such as ordinary transaction messages and configuration transaction messages submitted by clients ( Envelope type, channel header type is ENDORSER_TRANSACTION, CONFIG_UPDATE, etc.)
  2. After filtering, it is sent to the channel-bound consensus component chain objects (Solo type, Kafka type, etc.) for sorting.
    ——-Consensus———-
  3. Then add the sorted transactions to the local cached transaction message list to be processed, including configuration transaction messages, ordinary transaction messages, etc.;
  4. Construct a new block according to the transaction block rules and submit it to the block data file of the designated channel ledger of the Orderer node. At the same time, it is responsible for channel management such as creating new application channels and updating channel configuration.

A list of locally pending cached transactions

Solo Consensus

  1. Check and filter legitimate messages
    The chain.main() method of the Solo consensus component chain object first obtains the latest configuration sequence number seq of the current channel, and blocks and waits for ordinary transaction messages in the sendChan channel, as shown in the code. Source code example of the main() method of the Solo consensus component chain object processing common transaction messages
## https://github.com/hyperledger/fabric/blob/release-2.2/orderer/consensus/solo/consensus.go
## Add messages to the cached transaction message list, and cut them into batch transaction collection list batches according to the block generation rules
 batches, _ := ch.support.BlockCutter().Ordered(msg.normalMsg)
# /orderer/consensus/solo/consensus.go
func (ch *chain) main() {<!-- -->
var timer <- chan time.Time
var err error

for {<!-- --> // message loop processing
seq := ch.support.Sequence() // Get the configuration sequence number of the current channel
err = nil
select {<!-- -->
// Check the message of the sendChan channel
case msg := <-ch.sendChan:
if msg.configMsg == nil {<!-- --> // normal transaction message
// NormalMsg
if msg.configSeq < seq {<!-- -->
// Check if the configuration number in the message is less than the configuration number of the current channel,
                    // If it is, it means that the channel configuration has been updated, and the message needs to be re-filtered and verified
_, err = ch.support.ProcessNormalMsg(msg.normalMsg)
if err != nil {<!-- -->
// If an error is found, discard the message and jump to continue the loop
logger.Warningf("Discarding bad normal message: %s", err)
continue
}
}

// Add messages to the cached transaction message list, and cut them into batch transaction collection list batches according to block generation rules
batches, pending := ch.support.BlockCutter().Ordered(msg.normalMsg)
\t\t\t\t
####reording#####
\t\t\t\t
// Check the list of batch transaction collections waiting to be packed into blocks
for _, batch := range batches {<!-- --> // traverse the list of batch transaction collections
block := ch.support.CreateNextBlock(batch) // Create a new block
ch.support.WriteBlock(block, nil) // write the block into the ledger
}

switch {<!-- -->
case timer != nil & amp; & amp; !pending:
// Timer is already running but there are no messages pending, stop the timer
// If there is a batch transaction collection list, cancel the timer
timer = nil
case timer == nil & amp; & amp; pending:
// Timer is not already running and there are messages pending, so start it
// If there is no block message in the result and the timer is not set, then set the timer
timer = time.After(ch.support.SharedConfig().BatchTimeout())
logger.Debugf("Just began %s batch timer", ch.support.SharedConfig().BatchTimeout().String())
default:
// Do nothing when:
// 1. Timer is already running and there are messages pending
// 2. Timer is not set and there are no messages pending
}

} else {<!-- --> // configure transaction message: create a new application channel or update channel configuration
// ConfigMsg
if msg.configSeq < seq {<!-- -->
msg.configMsg, _, err = ch.support.ProcessConfigMsg(msg.configMsg)
if err != nil {<!-- -->
logger.Warningf("Discarding bad config message: %s", err)
continue
}
}
batch := ch.support.BlockCutter().Cut()
if batch != nil {<!-- -->
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}

block := ch.support.CreateNextBlock([]*cb.Envelope{<!-- -->msg.configMsg})
ch.support.WriteConfigBlock(block, nil)
timer = nil
}

// generate block timeout
case <-timer:
//clear the timer
timer = nil

batch := ch.support.BlockCutter().Cut()
if len(batch) == 0 {<!-- -->
logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
logger.Debugf("Batch timer expired, creating block")
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)

// If an exit message is received, exit the message processing loop
case <-ch.exitChan: // If an exit message is received, exit the message processing loop
logger. Debugf("Exiting")
return
}
}
}
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {<!-- -->
if len(r.pendingBatch) == 0 {<!-- -->
// We are beginning a new batch, mark the time
r.PendingBatchStartTime = time.Now()
}

ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
if !ok {<!-- -->
logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
}

batchSize := ordererConfig.BatchSize()

messageSizeBytes := messageSizeBytes(msg)
if messageSizeBytes > batchSize.PreferredMaxBytes {<!-- -->
logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, batchSize.PreferredMaxBytes)

// cut pending batch, if it has any messages
if len(r.pendingBatch) > 0 {<!-- -->
messageBatch := r. Cut()
messageBatches = append(messageBatches, messageBatch)
}

// create new batch with single message
messageBatches = append(messageBatches, []*cb. Envelope{<!-- -->msg})

// Record that this batch took no time to fill
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)

return
}

messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes + messageSizeBytes > batchSize.PreferredMaxBytes

if messageWillOverflowBatchSizeBytes {<!-- -->
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
messageBatch := r. Cut()
r.PendingBatchStartTime = time.Now()
messageBatches = append(messageBatches, messageBatch)
}

logger. Debugf("Enqueuing message into batch")
r.pendingBatch = append(r.pendingBatch, msg)
r.pendingBatchSizeBytes += messageSizeBytes
pending = true

if uint32(len(r.pendingBatch)) >= batchSize.MaxMessageCount {<!-- -->
logger.Debugf("Batch size met, cutting batch")
messageBatch := r. Cut()
messageBatches = append(messageBatches, messageBatch)
pending = false
}

return
}

2 Pack and generate blocks and submit them to the ledger
The chain.main() method calls the ch.support.BlockCutter().Ordered(msg.normalMsg)→receiver.Ordered() method, as shown in Listing 2-37, that is, through the message cutting component receiver, the currently received normal transaction The message is added to the cached transaction message list, and is cut into a batch transaction collection list batches ([][]*cb. Envelope type) according to the packing block rules. Among them, batches contains at most two batch transaction messages, and the second batch transaction set batch contains at most one transaction.

 orderer/common/blockcutter/blockcutter.go file
func (r *receiver) Ordered(msg *cb. Envelope) (messageBatches [][]*cb. Envelope,
   pending bool) {<!-- -->
   messageSizeBytes := messageSizeBytes(msg) // Get the number of bytes of the message
   // Check if the current message size exceeds the recommended maximum message size
   if messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {<!-- -->
         ...
       // If there is a cached transaction message list, cut out the batch transaction set
       if len(r.pendingBatch) > 0 {<!-- -->
           messageBatch := r.Cut() // Cut batch transaction collection
           messageBatches = append(messageBatches, messageBatch)
                                                     // Add to messageBatches list
       }

       // Construct msg as a separate collection of batch transactions and add them to the messageBatches list
       messageBatches = append(messageBatches, []*cb.Envelope{<!-- -->msg})
       return // return message processing loop
   }
   // If the length of the message after adding the msg message exceeds the recommended maximum number of bytes for the message, first clear the current cached transaction message list
   messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes + messageSizeBytes
       > r.sharedConfigManager.BatchSize().PreferredMaxBytes
   if messageWillOverflowBatchSizeBytes {<!-- -->
         ...
       messageBatch := r.Cut() // Cut batch transaction collection
       // Add to messageBatches list
       messageBatches = append(messageBatches, messageBatch)
   }
   ...
   r.pendingBatch = append(r.pendingBatch, msg) // Add the message msg to the cached transaction message list
   r.pendingBatchSizeBytes + = messageSizeBytes // Adjust the number of message bytes in the cached transaction message list
   pending = true
   // Check whether the number of messages in the adjusted cached transaction message list exceeds the preset maximum number of messages
   if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().
       MaxMessageCount {<!-- -->
       logger.Debugf("Batch size met, cutting batch")
       messageBatch := r.Cut() // Cut batch transaction collection
       messageBatches = append(messageBatches, messageBatch) // add to batch transaction collection list
       pending = false
   }
   return
}
  • Among them, the receiver.Ordered() method first obtains the message byte count of the ordinary transaction message msg. If it exceeds PreferredMaxBytes (the default is 512KB), the maximum number of message bytes recommended by the Orderer configuration, continue to check. If the number of messages len(r.pendingBatch) in the current cached transaction message list is greater than 0, call the r.Cut() method of the receiver component to clear the list to cut out the first batch transaction set messageBatch([]*cb.Envelope type) and added to the batch transaction message collection messageBatches. At the same time, the ordinary transaction message msg is constructed as a separate batch transaction message []*cb.Envelope{msg}, and then added to the messageBatches list. In this case, two batch transaction sets will be formed. Next, the receiver.Ordered() method calculates the number of message bytes in the current cached transaction message list after adding msg. If the number of bytes exceeds the recommended maximum number of message bytes PreferredMaxBytes, call the r.Cut() method to cut out the batch transaction set messageBatch, and then add it to the messageBatches list. At this time, the first batch transaction set will be formed.
  • Otherwise, directly add msg to the current cached transaction message list pendingBatch, and calculate the number of messages in the adjusted cached transaction message list. If the number of messages exceeds the maximum number of messages MaxMessageCount configured by Orderer (the default is 10), call the r.Cut() method to cut out the batch transaction set messageBatch, add it to the messageBatches list, and return it to chain.main The batches variable in the () method. In this case, the second batch transaction set will be formed, otherwise, only the first batch transaction set will be formed
  • Then the chain.main() method checks the list batches of batched transactions. If there is no cached message in the batches, and the timer timer (nil) is not set, set the timer to trigger the periodic event as the timeout period configured by the Orderer (the default is 2 seconds), responsible for periodically sending the packaged block message (TIMETOCUT type).
  • Finally, the chain.main() method traverses each object batch in batches, calls the CreateNextBlock(batch) method, creates a new block block based on the object, and then calls the WriteBlock(block, nil) method to write the new block into the current channel ledger in the block data file. Cancels the timer ( nil ) if batches has any set of batch transactions. At this point, the process of processing ordinary transaction messages by the chain.main() method ends and returns to the message processing cycle

Deliver() block distribution service

The Orderer sorting server provides the Deliver() block distribution service interface, and hands over the received service request to the Handle() method of the Deliver service handle for processing, establishes a message processing cycle, and is responsible for receiving and processing the block request message submitted by the client (Envelope type, the channel header type is DELIVER_SEEK_INFO, CONFIG_UPDATE, etc.), which encapsulates the block search information (SeekInfo type) of the specified block request range. Next, the Deliver service handler loops to obtain block data from the local ledger, and sends it to the requesting node (such as the Leader master node) in turn. If the specified block has not been generated in the ledger, the Deliver service handle will block and wait by default until the block is created and the ledger is submitted before replying to the requesting node.