[Hyperledger Fabric source code interpretation] sever.go

/*
Copyright IBM Corp. 2017 All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package server

import (
"fmt"
"io/ioutil"
"os"
"runtime/debug"
"time"

"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric-protos-go/common"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/broadcast"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)

type broadcastSupport struct {<!-- -->
*multichannel. Registrar
}

func (bs broadcastSupport) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, broadcast.ChannelSupport, error) {<!-- -->
return bs.Registrar.BroadcastChannelSupport(msg)
}

type deliverSupport struct {<!-- -->
*multichannel. Registrar
}

func (ds deliverSupport) GetChain(chainID string) deliver.Chain {<!-- -->
chain := ds.Registrar.GetChain(chainID)
if chain == nil {<!-- -->
return nil
}
return chain
}

type server struct {<!-- -->
bh *broadcast.Handler // transaction collection
dh *deliver.Handler // block diffusion
debug *localconfig.Debug
*multichannel. Registrar
}

type responseSender struct {<!-- -->
ab.AtomicBroadcast_DeliverServer
}

func (rs *responseSender) SendStatusResponse(status cb.Status) error {<!-- -->
reply := &ab.DeliverResponse{<!-- -->
Type: &ab.DeliverResponse_Status{<!-- -->Status: status},
}
return rs.Send(reply)
}

// SendBlockResponse sends block data and ignores pvtDataMap.
func (rs *responseSender) SendBlockResponse(
block *cb.Block,
channelID string,
chain deliver. Chain,
signedData *protoutil.SignedData,
) error {<!-- -->
response := &ab.DeliverResponse{<!-- -->
Type: &ab.DeliverResponse_Block{<!-- -->Block: block},
}
return rs.Send(response)
}

func (rs *responseSender) DataType() string {<!-- -->
return "block"
}

// NewServer creates an ab. AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewServer(
r *multichannel.Registrar,
metricsProvider metrics. Provider,
debug *localconfig.Debug,
timeWindow time.Duration,
mutualTLS bool,
expirationCheckDisabled bool,
) ab.AtomicBroadcastServer {<!-- -->
s := &server{<!-- -->
dh: deliver.NewHandler(deliverSupport{<!-- -->Registrar: r}, timeWindow, mutualTLS, deliver.NewMetrics(metricsProvider), expirationCheckDisabled),
bh: & amp; broadcast.Handler{<!-- -->
SupportRegistrar: broadcastSupport{<!-- -->Registrar: r},
Metrics: broadcast. NewMetrics(metricsProvider),
},
debug: debug,
Registrar: r,
}
returns
}

type msgTracer struct {<!-- -->
function string
debug *localconfig.Debug
}

func (mt *msgTracer) trace(traceDir string, msg *cb.Envelope, err error) {<!-- -->
if err != nil {<!-- -->
return
}

now := time.Now().UnixNano()
path := fmt.Sprintf("%s%c%d_%p.%s", traceDir, os.PathSeparator, now, msg, mt.function)
logger.Debugf("Writing %s request trace to %s", mt.function, path)
go func() {<!-- -->
pb, err := proto. Marshal(msg)
if err != nil {<!-- -->
logger.Debugf("Error marshaling trace msg for %s: %s", path, err)
return
}
err = ioutil.WriteFile(path, pb, 0660)
if err != nil {<!-- -->
logger.Debugf("Error writing trace msg for %s: %s", path, err)
}
}()
}

type broadcastMsgTracer struct {<!-- -->
ab.AtomicBroadcast_BroadcastServer
msgTracer
}

func (bmt *broadcastMsgTracer) Recv() (*cb. Envelope, error) {<!-- -->
msg, err := bmt.AtomicBroadcast_BroadcastServer.Recv()
if traceDir := bmt.debug.BroadcastTraceDir; traceDir != "" {<!-- -->
bmt.trace(bmt.debug.BroadcastTraceDir, msg, err)
}
return msg, err
}

type deliverMsgTracer struct {<!-- -->
deliver. Receiver
msgTracer
}

func (dmt *deliverMsgTracer) Recv() (*cb. Envelope, error) {<!-- -->
msg, err := dmt. Receiver. Recv()
if traceDir := dmt.debug.DeliverTraceDir; traceDir != "" {<!-- -->
dmt.trace(traceDir, msg, err)
}
return msg, err
}

// Broadcast receives a stream of messages from a client for ordering
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {<!-- -->
logger.Debugf("Starting new Broadcast handler")
defer func() {<!-- -->
if r := recover(); r != nil {<!-- -->
logger.Criticalf("Broadcast client triggered panic: %s\\
%s", r, debug.Stack())
}
logger.Debugf("Closing Broadcast stream")
}()
return s.bh.Handle( & amp;broadcastMsgTracer{<!-- -->
AtomicBroadcast_BroadcastServer: srv,
msgTracer: msgTracer{<!-- -->
debug: s.debug,
function: "Broadcast",
},
})
}

// Deliver sends a stream of blocks to a client after ordering
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {<!-- -->
logger. Debugf("Starting new Deliver handler")
defer func() {<!-- -->
if r := recover(); r != nil {<!-- -->
logger.Criticalf("Deliver client triggered panic: %s\\
%s", r, debug.Stack())
}
logger.Debugf("Closing Deliver stream")
}()

policyChecker := func(env *cb.Envelope, channelID string) error {<!-- -->
chain := s. GetChain(channelID)
if chain == nil {<!-- -->
return errors.Errorf("channel %s not found", channelID)
}
// In maintenance mode, we typically require the signature of /Channel/Orderer/Readers.
// This will block Deliver requests from peers (which normally satisfy /Channel/Readers).
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, policies.ChannelOrdererReaders, chain)
return sf. Apply(env)
}
deliverServer := &deliver.Server{<!-- -->
PolicyChecker: deliver.PolicyCheckerFunc(policyChecker),
Receiver: &deliverMsgTracer{<!-- -->
Receiver: srv,
msgTracer: msgTracer{<!-- -->
debug: s.debug,
function: "Deliver",
},
},
ResponseSender: &responseSender{<!-- -->
AtomicBroadcast_DeliverServer: srv,
},
}
return s.dh.Handle(srv.Context(), deliverServer)
}