go_study/fabric-main/orderer/consensus/etcdraft/util.go

492 lines
16 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"crypto/x509"
"encoding/pem"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
)
// RaftPeers maps consenters to slice of raft.Peer
func RaftPeers(consenterIDs []uint64) []raft.Peer {
var peers []raft.Peer
for _, raftID := range consenterIDs {
peers = append(peers, raft.Peer{ID: raftID})
}
return peers
}
type ConsentersMap map[string]struct{}
func (c ConsentersMap) Exists(consenter *etcdraft.Consenter) bool {
_, exists := c[string(consenter.ClientTlsCert)]
return exists
}
// ConsentersToMap maps consenters into set where key is client TLS certificate
func ConsentersToMap(consenters []*etcdraft.Consenter) ConsentersMap {
set := map[string]struct{}{}
for _, c := range consenters {
set[string(c.ClientTlsCert)] = struct{}{}
}
return set
}
// MetadataHasDuplication returns an error if the metadata has duplication of consenters.
// A duplication is defined by having a server or a client TLS certificate that is found
// in two different consenters, regardless of the type of certificate (client/server).
func MetadataHasDuplication(md *etcdraft.ConfigMetadata) error {
if md == nil {
return errors.New("nil metadata")
}
for _, consenter := range md.Consenters {
if consenter == nil {
return errors.New("nil consenter in metadata")
}
}
seen := make(map[string]struct{})
for _, consenter := range md.Consenters {
serverKey := string(consenter.ServerTlsCert)
clientKey := string(consenter.ClientTlsCert)
_, duplicateServerCert := seen[serverKey]
_, duplicateClientCert := seen[clientKey]
if duplicateServerCert || duplicateClientCert {
return errors.Errorf("duplicate consenter: server cert: %s, client cert: %s", serverKey, clientKey)
}
seen[serverKey] = struct{}{}
seen[clientKey] = struct{}{}
}
return nil
}
// MetadataFromConfigValue reads and translates configuration updates from config value into raft metadata
func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, error) {
consensusTypeValue := &orderer.ConsensusType{}
if err := proto.Unmarshal(configValue.Value, consensusTypeValue); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal consensusType config update")
}
updatedMetadata := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(consensusTypeValue.Metadata, updatedMetadata); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration")
}
return updatedMetadata, nil
}
// MetadataFromConfigUpdate extracts consensus metadata from config update
func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, error) {
var baseVersion uint64
if update.ReadSet != nil && update.ReadSet.Groups != nil {
if ordererConfigGroup, ok := update.ReadSet.Groups["Orderer"]; ok {
if val, ok := ordererConfigGroup.Values["ConsensusType"]; ok {
baseVersion = val.Version
}
}
}
if update.WriteSet != nil && update.WriteSet.Groups != nil {
if ordererConfigGroup, ok := update.WriteSet.Groups["Orderer"]; ok {
if val, ok := ordererConfigGroup.Values["ConsensusType"]; ok {
if baseVersion == val.Version {
// Only if the version in the write set differs from the read-set
// should we consider this to be an update to the consensus type
return nil, nil
}
return MetadataFromConfigValue(val)
}
}
}
return nil, nil
}
// ConfigChannelHeader expects a config block and returns the header type
// of the config envelope wrapped in it, e.g. HeaderType_ORDERER_TRANSACTION
func ConfigChannelHeader(block *common.Block) (hdr *common.ChannelHeader, err error) {
envelope, err := protoutil.ExtractEnvelope(block, 0)
if err != nil {
return nil, errors.Wrap(err, "failed to extract envelope from the block")
}
channelHeader, err := protoutil.ChannelHeader(envelope)
if err != nil {
return nil, errors.Wrap(err, "cannot extract channel header")
}
return channelHeader, nil
}
// ConfigEnvelopeFromBlock extracts configuration envelope from the block based on the
// config type, i.e. HeaderType_ORDERER_TRANSACTION or HeaderType_CONFIG
func ConfigEnvelopeFromBlock(block *common.Block) (*common.Envelope, error) {
if block == nil {
return nil, errors.New("nil block")
}
envelope, err := protoutil.ExtractEnvelope(block, 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to extract envelope from the block")
}
channelHeader, err := protoutil.ChannelHeader(envelope)
if err != nil {
return nil, errors.Wrap(err, "cannot extract channel header")
}
switch channelHeader.Type {
case int32(common.HeaderType_ORDERER_TRANSACTION):
return nil, errors.Errorf("unsupported legacy system channel header type: %v", channelHeader.Type)
case int32(common.HeaderType_CONFIG):
return envelope, nil
default:
return nil, errors.Errorf("unexpected header type: %v", channelHeader.Type)
}
}
// ConsensusMetadataFromConfigBlock reads consensus metadata updates from the configuration block
func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, error) {
if block == nil {
return nil, errors.New("nil block")
}
if !protoutil.IsConfigBlock(block) {
return nil, errors.New("not a config block")
}
configEnvelope, err := ConfigEnvelopeFromBlock(block)
if err != nil {
return nil, errors.Wrap(err, "cannot read config update")
}
payload, err := protoutil.UnmarshalPayload(configEnvelope.Payload)
if err != nil {
return nil, errors.Wrap(err, "failed to extract payload from config envelope")
}
// get config update
configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
if err != nil {
return nil, errors.Wrap(err, "could not read config update")
}
return MetadataFromConfigUpdate(configUpdate)
}
// VerifyConfigMetadata validates Raft config metadata.
// Note: ignores certificates expiration.
func VerifyConfigMetadata(metadata *etcdraft.ConfigMetadata, verifyOpts x509.VerifyOptions) error {
if metadata == nil {
// defensive check. this should not happen as CheckConfigMetadata
// should always be called with non-nil config metadata
return errors.Errorf("nil Raft config metadata")
}
if metadata.Options == nil {
return errors.Errorf("nil Raft config metadata options")
}
if metadata.Options.HeartbeatTick == 0 ||
metadata.Options.ElectionTick == 0 ||
metadata.Options.MaxInflightBlocks == 0 {
// if SnapshotIntervalSize is zero, DefaultSnapshotIntervalSize is used
return errors.Errorf("none of HeartbeatTick (%d), ElectionTick (%d) and MaxInflightBlocks (%d) can be zero",
metadata.Options.HeartbeatTick, metadata.Options.ElectionTick, metadata.Options.MaxInflightBlocks)
}
// check Raft options
if metadata.Options.ElectionTick <= metadata.Options.HeartbeatTick {
return errors.Errorf("ElectionTick (%d) must be greater than HeartbeatTick (%d)",
metadata.Options.ElectionTick, metadata.Options.HeartbeatTick)
}
if d, err := time.ParseDuration(metadata.Options.TickInterval); err != nil {
return errors.Errorf("failed to parse TickInterval (%s) to time duration: %s", metadata.Options.TickInterval, err)
} else if d == 0 {
return errors.Errorf("TickInterval cannot be zero")
}
if len(metadata.Consenters) == 0 {
return errors.Errorf("empty consenter set")
}
// verifying certificates for being signed by CA, expiration is ignored
for _, consenter := range metadata.Consenters {
if consenter == nil {
return errors.Errorf("metadata has nil consenter")
}
if err := validateConsenterTLSCerts(consenter, verifyOpts, true); err != nil {
return errors.WithMessagef(err, "consenter %s:%d has invalid certificate", consenter.Host, consenter.Port)
}
}
if err := MetadataHasDuplication(metadata); err != nil {
return err
}
return nil
}
func parseCertificateFromBytes(cert []byte) (*x509.Certificate, error) {
pemBlock, _ := pem.Decode(cert)
if pemBlock == nil {
return &x509.Certificate{}, errors.Errorf("no PEM data found in cert[% x]", cert)
}
certificate, err := x509.ParseCertificate(pemBlock.Bytes)
if err != nil {
return nil, errors.Errorf("%s TLS certificate has invalid ASN1 structure %s", err, string(pemBlock.Bytes))
}
return certificate, nil
}
func parseCertificateListFromBytes(certs [][]byte) ([]*x509.Certificate, error) {
var certificateList []*x509.Certificate
for _, cert := range certs {
certificate, err := parseCertificateFromBytes(cert)
if err != nil {
return certificateList, err
}
certificateList = append(certificateList, certificate)
}
return certificateList, nil
}
func createX509VerifyOptions(ordererConfig channelconfig.Orderer) (x509.VerifyOptions, error) {
tlsRoots := x509.NewCertPool()
tlsIntermediates := x509.NewCertPool()
for _, org := range ordererConfig.Organizations() {
rootCerts, err := parseCertificateListFromBytes(org.MSP().GetTLSRootCerts())
if err != nil {
return x509.VerifyOptions{}, errors.Wrap(err, "parsing tls root certs")
}
intermediateCerts, err := parseCertificateListFromBytes(org.MSP().GetTLSIntermediateCerts())
if err != nil {
return x509.VerifyOptions{}, errors.Wrap(err, "parsing tls intermediate certs")
}
for _, cert := range rootCerts {
tlsRoots.AddCert(cert)
}
for _, cert := range intermediateCerts {
tlsIntermediates.AddCert(cert)
}
}
return x509.VerifyOptions{
Roots: tlsRoots,
Intermediates: tlsIntermediates,
KeyUsages: []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
x509.ExtKeyUsageServerAuth,
},
}, nil
}
// validateConsenterTLSCerts decodes PEM cert, parses and validates it.
func validateConsenterTLSCerts(c *etcdraft.Consenter, opts x509.VerifyOptions, ignoreExpiration bool) error {
clientCert, err := parseCertificateFromBytes(c.ClientTlsCert)
if err != nil {
return errors.Wrapf(err, "parsing tls client cert of %s:%d", c.Host, c.Port)
}
serverCert, err := parseCertificateFromBytes(c.ServerTlsCert)
if err != nil {
return errors.Wrapf(err, "parsing tls server cert of %s:%d", c.Host, c.Port)
}
verify := func(certType string, cert *x509.Certificate, opts x509.VerifyOptions) error {
if _, err := cert.Verify(opts); err != nil {
if validationRes, ok := err.(x509.CertificateInvalidError); !ok || (!ignoreExpiration || validationRes.Reason != x509.Expired) {
return errors.Wrapf(err, "verifying tls %s cert with serial number %d", certType, cert.SerialNumber)
}
}
return nil
}
if err := verify("client", clientCert, opts); err != nil {
return err
}
if err := verify("server", serverCert, opts); err != nil {
return err
}
return nil
}
// ConsenterCertificate denotes a TLS certificate of a consenter
type ConsenterCertificate struct {
ConsenterCertificate []byte
CryptoProvider bccsp.BCCSP
Logger *flogging.FabricLogger
}
// IsConsenterOfChannel returns whether the caller is a consenter of a channel
// by inspecting the given configuration block.
// It returns nil if true, else returns an error.
func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *common.Block) error {
if configBlock == nil || configBlock.Header == nil {
return errors.New("nil block or nil header")
}
envelopeConfig, err := protoutil.ExtractEnvelope(configBlock, 0)
if err != nil {
return err
}
bundle, err := channelconfig.NewBundleFromEnvelope(envelopeConfig, conCert.CryptoProvider)
if err != nil {
return err
}
oc, exists := bundle.OrdererConfig()
if !exists {
return errors.New("no orderer config in bundle")
}
m := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(oc.ConsensusMetadata(), m); err != nil {
return err
}
bl, _ := pem.Decode(conCert.ConsenterCertificate)
if bl == nil {
return errors.Errorf("my consenter certificate %s is not a valid PEM", string(conCert.ConsenterCertificate))
}
myCertDER := bl.Bytes
var failedMatches []string
for _, consenter := range m.Consenters {
candidateBlock, _ := pem.Decode(consenter.ServerTlsCert)
if candidateBlock == nil {
return errors.Errorf("candidate server certificate %s is not a valid PEM", string(consenter.ServerTlsCert))
}
sameServerCertErr := crypto.CertificatesWithSamePublicKey(myCertDER, candidateBlock.Bytes)
candidateBlock, _ = pem.Decode(consenter.ClientTlsCert)
if candidateBlock == nil {
return errors.Errorf("candidate client certificate %s is not a valid PEM", string(consenter.ClientTlsCert))
}
sameClientCertErr := crypto.CertificatesWithSamePublicKey(myCertDER, candidateBlock.Bytes)
if sameServerCertErr == nil || sameClientCertErr == nil {
return nil
}
conCert.Logger.Debugf("I am not %s:%d because %s, %s", consenter.Host, consenter.Port, sameServerCertErr, sameClientCertErr)
failedMatches = append(failedMatches, string(consenter.ClientTlsCert))
}
conCert.Logger.Debugf("Failed matching our certificate %s against certificates encoded in config block %d: %v",
string(conCert.ConsenterCertificate),
configBlock.Header.Number,
failedMatches)
return cluster.ErrNotInChannel
}
// NodeExists returns trues if node id exists in the slice
// and false otherwise
func NodeExists(id uint64, nodes []uint64) bool {
for _, nodeID := range nodes {
if nodeID == id {
return true
}
}
return false
}
// ConfChange computes Raft configuration changes based on current Raft
// configuration state and consenters IDs stored in RaftMetadata.
func ConfChange(blockMetadata *etcdraft.BlockMetadata, confState *raftpb.ConfState) *raftpb.ConfChange {
raftConfChange := &raftpb.ConfChange{}
// need to compute conf changes to propose
if len(confState.Voters) < len(blockMetadata.ConsenterIds) {
// adding new node
raftConfChange.Type = raftpb.ConfChangeAddNode
for _, consenterID := range blockMetadata.ConsenterIds {
if NodeExists(consenterID, confState.Voters) {
continue
}
raftConfChange.NodeID = consenterID
}
} else {
// removing node
raftConfChange.Type = raftpb.ConfChangeRemoveNode
for _, nodeID := range confState.Voters {
if NodeExists(nodeID, blockMetadata.ConsenterIds) {
continue
}
raftConfChange.NodeID = nodeID
}
}
return raftConfChange
}
// CreateConsentersMap creates a map of Raft Node IDs to Consenter given the block metadata and the config metadata.
func CreateConsentersMap(blockMetadata *etcdraft.BlockMetadata, configMetadata *etcdraft.ConfigMetadata) map[uint64]*etcdraft.Consenter {
consenters := map[uint64]*etcdraft.Consenter{}
for i, consenter := range configMetadata.Consenters {
consenters[blockMetadata.ConsenterIds[i]] = consenter
}
return consenters
}
func CreateX509VerifyOptions(ordererConfig channelconfig.Orderer) (x509.VerifyOptions, error) {
tlsRoots := x509.NewCertPool()
tlsIntermediates := x509.NewCertPool()
for _, org := range ordererConfig.Organizations() {
rootCerts, err := parseCertificateListFromBytes(org.MSP().GetTLSRootCerts())
if err != nil {
return x509.VerifyOptions{}, errors.Wrap(err, "parsing tls root certs")
}
intermediateCerts, err := parseCertificateListFromBytes(org.MSP().GetTLSIntermediateCerts())
if err != nil {
return x509.VerifyOptions{}, errors.Wrap(err, "parsing tls intermediate certs")
}
for _, cert := range rootCerts {
tlsRoots.AddCert(cert)
}
for _, cert := range intermediateCerts {
tlsIntermediates.AddCert(cert)
}
}
return x509.VerifyOptions{
Roots: tlsRoots,
Intermediates: tlsIntermediates,
KeyUsages: []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
x509.ExtKeyUsageServerAuth,
},
}, nil
}