go_study/fabric-main/orderer/consensus/smartbft/consenter.go

273 lines
8.6 KiB
Go

/*
*
* Copyright IBM Corp. All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
* /
*
*/
package smartbft
import (
"bytes"
"encoding/pem"
"path"
"reflect"
"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/msp"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/orderer/smartbft"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protoutil"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
// CreateChainCallback creates a new chain
type CreateChainCallback func()
// ChainGetter obtains instances of ChainSupport for the given channel
type ChainGetter interface {
// GetChain obtains the ChainSupport for the given channel.
// Returns nil, false when the ChainSupport for the given channel
// isn't found.
GetChain(chainID string) *multichannel.ChainSupport
}
// PolicyManagerRetriever is the policy manager retriever function
type PolicyManagerRetriever func(channel string) policies.Manager
// Consenter implementation of the BFT smart based consenter
type Consenter struct {
CreateChain func(chainName string)
GetPolicyManager PolicyManagerRetriever
Logger *flogging.FabricLogger
Identity []byte
Comm *cluster.AuthCommMgr
Chains ChainGetter
SignerSerializer SignerSerializer
Registrar *multichannel.Registrar
WALBaseDir string
ClusterDialer *cluster.PredicateDialer
Conf *localconfig.TopLevel
Metrics *Metrics
BCCSP bccsp.BCCSP
ClusterService *cluster.ClusterService
}
// New creates Consenter of type smart bft
func New(
pmr PolicyManagerRetriever,
signerSerializer SignerSerializer,
clusterDialer *cluster.PredicateDialer,
conf *localconfig.TopLevel,
srvConf comm.ServerConfig, // TODO why is this not used?
srv *comm.GRPCServer,
r *multichannel.Registrar,
metricsProvider metrics.Provider,
clusterMetrics *cluster.Metrics,
BCCSP bccsp.BCCSP,
) *Consenter {
logger := flogging.MustGetLogger("orderer.consensus.smartbft")
var walConfig WALConfig
err := mapstructure.Decode(conf.Consensus, &walConfig)
if err != nil {
logger.Panicf("Failed to decode consensus configuration: %s", err)
}
logger.Infof("WAL Directory is %s", walConfig.WALDir)
consenter := &Consenter{
Registrar: r,
GetPolicyManager: pmr,
Conf: conf,
ClusterDialer: clusterDialer,
Logger: logger,
Chains: r,
SignerSerializer: signerSerializer,
WALBaseDir: walConfig.WALDir,
Metrics: NewMetrics(metricsProvider),
CreateChain: r.CreateChain,
BCCSP: BCCSP,
}
identity, _ := signerSerializer.Serialize()
sID := &msp.SerializedIdentity{}
if err := proto.Unmarshal(identity, sID); err != nil {
logger.Panicf("failed unmarshaling identity: %s", err)
}
consenter.Identity = sID.IdBytes
consenter.Comm = &cluster.AuthCommMgr{
Logger: flogging.MustGetLogger("orderer.common.cluster"),
Metrics: clusterMetrics,
SendBufferSize: conf.General.Cluster.SendBufferSize,
Chan2Members: make(cluster.MembersByChannel),
Connections: cluster.NewConnectionMgr(clusterDialer.Config),
Signer: signerSerializer,
NodeIdentity: sID.IdBytes,
}
consenter.ClusterService = &cluster.ClusterService{
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: clusterMetrics,
},
Logger: flogging.MustGetLogger("orderer.common.cluster"),
StepLogger: flogging.MustGetLogger("orderer.common.cluster.step"),
MinimumExpirationWarningInterval: cluster.MinimumExpirationWarningInterval,
CertExpWarningThreshold: conf.General.Cluster.CertExpirationWarningThreshold,
MembershipByChannel: make(map[string]*cluster.ChannelMembersConfig),
NodeIdentity: sID.IdBytes,
RequestHandler: &Ingress{
Logger: logger,
ChainSelector: consenter,
},
}
ab.RegisterClusterNodeServiceServer(srv.Server(), consenter.ClusterService)
return consenter
}
// ReceiverByChain returns the MessageReceiver for the given channelID or nil if not found.
func (c *Consenter) ReceiverByChain(channelID string) MessageReceiver {
cs := c.Chains.GetChain(channelID)
if cs == nil {
return nil
}
if cs.Chain == nil {
c.Logger.Panicf("Programming error - Chain %s is nil although it exists in the mapping", channelID)
}
if smartBFTChain, isBFTSmart := cs.Chain.(*BFTChain); isBFTSmart {
return smartBFTChain
}
c.Logger.Warningf("Chain %s is of type %v and not smartbft.Chain", channelID, reflect.TypeOf(cs.Chain))
return nil
}
// HandleChain returns a new Chain instance or an error upon failure
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
configOptions := &smartbft.Options{}
consenters := support.SharedConfig().Consenters()
if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), configOptions); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
}
selfID, err := c.detectSelfID(consenters)
if err != nil {
return nil, errors.Wrap(err, "without a system channel, a follower should have been created")
}
c.Logger.Infof("Local consenter id is %d", selfID)
puller, err := newBlockPuller(support, c.ClusterDialer, c.Conf.General.Cluster, c.BCCSP)
if err != nil {
c.Logger.Panicf("Failed initializing block puller")
}
config, err := configFromMetadataOptions((uint64)(selfID), configOptions)
if err != nil {
return nil, errors.Wrap(err, "failed parsing smartbft configuration")
}
c.Logger.Debugf("SmartBFT-Go config: %+v", config)
configValidator := &ConfigBlockValidator{
ValidatingChannel: support.ChannelID(),
Filters: c.Registrar,
ConfigUpdateProposer: c.Registrar,
Logger: c.Logger,
}
chain, err := NewChain(configValidator, (uint64)(selfID), config, path.Join(c.WALBaseDir, support.ChannelID()), puller, c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), support, c.Metrics, c.BCCSP)
if err != nil {
return nil, errors.Wrap(err, "failed creating a new BFTChain")
}
// refresh cluster service with updated consenters
c.ClusterService.ConfigureNodeCerts(chain.Channel, consenters)
chain.clusterService = c.ClusterService
return chain, nil
}
func (c *Consenter) IsChannelMember(joinBlock *cb.Block) (bool, error) {
if joinBlock == nil {
return false, errors.New("nil block")
}
envelopeConfig, err := protoutil.ExtractEnvelope(joinBlock, 0)
if err != nil {
return false, err
}
bundle, err := channelconfig.NewBundleFromEnvelope(envelopeConfig, c.BCCSP)
if err != nil {
return false, err
}
oc, exists := bundle.OrdererConfig()
if !exists {
return false, errors.New("no orderer config in bundle")
}
member := false
for _, consenter := range oc.Consenters() {
santizedCert, err := crypto.SanitizeX509Cert(consenter.Identity)
if err != nil {
return false, err
}
if bytes.Equal(c.Identity, santizedCert) {
member = true
break
}
}
return member, nil
}
// TargetChannel extracts the channel from the given proto.Message.
// Returns an empty string on failure.
func (c *Consenter) TargetChannel(message proto.Message) string {
switch req := message.(type) {
case *ab.ConsensusRequest:
return req.Channel
case *ab.SubmitRequest:
return req.Channel
default:
return ""
}
}
func pemToDER(pemBytes []byte, id uint64, certType string, logger *flogging.FabricLogger) ([]byte, error) {
bl, _ := pem.Decode(pemBytes)
if bl == nil {
logger.Errorf("Rejecting PEM block of %s TLS cert for node %d, offending PEM is: %s", certType, id, string(pemBytes))
return nil, errors.Errorf("invalid PEM block")
}
return bl.Bytes, nil
}
func (c *Consenter) detectSelfID(consenters []*cb.Consenter) (uint32, error) {
for _, cst := range consenters {
santizedCert, err := crypto.SanitizeX509Cert(cst.Identity)
if err != nil {
return 0, err
}
if bytes.Equal(c.Comm.NodeIdentity, santizedCert) {
return cst.Id, nil
}
}
c.Logger.Warning("Could not find the node in channel consenters set")
return 0, cluster.ErrNotInChannel
}