go_study/fabric-main/orderer/common/multichannel/registrar.go

928 lines
30 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
// Package multichannel tracks the channel resources for the orderer. It initially
// loads the set of existing channels, and provides an interface for users of these
// channels to retrieve them, or create new ones.
package multichannel
import (
"path/filepath"
"sync"
"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/filerepo"
"github.com/hyperledger/fabric/orderer/common/follower"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
const (
msgVersion = int32(0)
epoch = 0
)
var logger = flogging.MustGetLogger("orderer.common.multichannel")
// Registrar serves as a point of access and control for the individual channel resources.
type Registrar struct {
config localconfig.TopLevel
lock sync.RWMutex
chains map[string]*ChainSupport
followers map[string]*follower.Chain
// existence indicates removal is in-progress or failed
// when failed, the status will indicate failed all other states
// denote an in-progress removal
pendingRemoval map[string]consensus.StaticStatusReporter
consenters map[string]consensus.Consenter
ledgerFactory blockledger.Factory
signer identity.SignerSerializer
blockcutterMetrics *blockcutter.Metrics
callbacks []channelconfig.BundleActor
bccsp bccsp.BCCSP
clusterDialer *cluster.PredicateDialer
channelParticipationMetrics *Metrics
joinBlockFileRepo *filerepo.Repo
}
// ConfigBlockOrPanic retrieves the last configuration block from the given ledger.
// Panics on failure.
func ConfigBlockOrPanic(reader blockledger.Reader) *cb.Block {
lastBlock, err := blockledger.GetBlockByNumber(reader, reader.Height()-1)
if err != nil {
logger.Panicw("Failed to retrieve block", "blockNum", reader.Height()-1, "error", err)
}
index, err := protoutil.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Panicw("Chain did not have appropriately encoded last config in its latest block", "error", err)
}
configBlock, err := blockledger.GetBlockByNumber(reader, index)
if err != nil {
logger.Panicw("Failed to retrieve config block", "blockNum", index, "error", err)
}
return configBlock
}
func configTx(reader blockledger.Reader) *cb.Envelope {
return protoutil.ExtractEnvelopeOrPanic(ConfigBlockOrPanic(reader), 0)
}
// NewRegistrar produces an instance of a *Registrar.
func NewRegistrar(
config localconfig.TopLevel,
ledgerFactory blockledger.Factory,
signer identity.SignerSerializer,
metricsProvider metrics.Provider,
bccsp bccsp.BCCSP,
clusterDialer *cluster.PredicateDialer,
callbacks ...channelconfig.BundleActor) *Registrar {
r := &Registrar{
config: config,
chains: make(map[string]*ChainSupport),
followers: make(map[string]*follower.Chain),
pendingRemoval: make(map[string]consensus.StaticStatusReporter),
ledgerFactory: ledgerFactory,
signer: signer,
blockcutterMetrics: blockcutter.NewMetrics(metricsProvider),
callbacks: callbacks,
bccsp: bccsp,
clusterDialer: clusterDialer,
channelParticipationMetrics: NewMetrics(metricsProvider),
}
var err error
r.joinBlockFileRepo, err = InitJoinBlockFileRepo(&r.config)
if err != nil {
logger.Panicf("Error initializing joinblock file repo: %s", err)
}
return r
}
// InitJoinBlockFileRepo initialize the channel participation API joinblock file repo. This creates
// the fileRepoDir on the filesystem if it does not already exist.
func InitJoinBlockFileRepo(config *localconfig.TopLevel) (*filerepo.Repo, error) {
fileRepoDir := filepath.Join(config.FileLedger.Location, "pendingops")
logger.Infof("Channel Participation API enabled, registrar initializing with file repo %s", fileRepoDir)
joinBlockFileRepo, err := filerepo.New(fileRepoDir, "join")
if err != nil {
return nil, err
}
return joinBlockFileRepo, nil
}
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
r.init(consenters)
r.lock.Lock()
defer r.lock.Unlock()
r.startChannels()
}
func (r *Registrar) init(consenters map[string]consensus.Consenter) {
r.consenters = consenters
// Discover and load join-blocks. If there is a join-block, there must be a ledger; if there is none, create it.
// channelsWithJoinBlock maps channelID to a join-block.
channelsWithJoinBlock := r.loadJoinBlocks()
// Discover all ledgers. This should already include all channels with join blocks as well.
// Make sure there are no empty ledgers without a corresponding join-block.
existingChannels := r.discoverLedgers(channelsWithJoinBlock)
// Initialize application channels, by creating either a consensus.Chain or a follower.Chain.
r.initAppChannels(existingChannels, channelsWithJoinBlock)
}
// startChannels starts internal go-routines in chains and followers.
// Since these go-routines may call-back on the Registrar, this must be protected with a lock.
func (r *Registrar) startChannels() {
for _, chainSupport := range r.chains {
chainSupport.start()
}
for _, fChain := range r.followers {
fChain.Start()
}
logger.Infof("Registrar initializing without a system channel, number of application channels: %d, with %d consensus.Chain(s) and %d follower.Chain(s)",
len(r.chains)+len(r.followers), len(r.chains), len(r.followers))
}
func (r *Registrar) discoverLedgers(channelsWithJoinBlock map[string]*cb.Block) []string {
// Discover all ledgers. This should already include all channels with join blocks as well.
existingChannels := r.ledgerFactory.ChannelIDs()
for _, channelID := range existingChannels {
rl, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Ledger factory reported channelID %s but could not retrieve it: %s", channelID, err)
}
// Prune empty ledgers without a join block
if rl.Height() == 0 {
if _, ok := channelsWithJoinBlock[channelID]; !ok {
logger.Warnf("Channel '%s' has an empty ledger without a join-block, removing it", channelID)
if err := r.ledgerFactory.Remove(channelID); err != nil {
logger.Panicf("Ledger factory failed to remove empty ledger '%s', error: %s", channelID, err)
}
}
}
}
return r.ledgerFactory.ChannelIDs()
}
// initAppChannels initializes application channels, assuming that the system channel does NOT exist.
// This implies that the orderer is using the channel participation API for joins (channel creation).
func (r *Registrar) initAppChannels(existingChannels []string, channelsWithJoinBlock map[string]*cb.Block) {
// init app channels with join-blocks
for channelID, joinBlock := range channelsWithJoinBlock {
ledgerRes, clusterConsenter, err := r.initLedgerResourcesClusterConsenter(joinBlock)
if err != nil {
logger.Panicf("Error: %s, channel: %s", err, channelID)
}
isMember, err := clusterConsenter.IsChannelMember(joinBlock)
if err != nil {
logger.Panicf("Failed to determine cluster membership from join-block, channel: %s, error: %s", channelID, err)
}
if joinBlock.Header.Number == 0 && isMember {
if _, _, err := r.createAsMember(ledgerRes, joinBlock, channelID); err != nil {
logger.Panicf("Failed to createAsMember, error: %s", err)
}
if err := r.removeJoinBlock(channelID); err != nil {
logger.Panicf("Failed to remove join-block, channel: %s, error: %s", channelID, err)
}
} else {
if _, _, err = r.createFollower(ledgerRes, clusterConsenter, joinBlock, channelID); err != nil {
logger.Panicf("Failed to createFollower, error: %s", err)
}
}
}
// init app channels without join-blocks
for _, channelID := range existingChannels {
if _, withJoinBlock := channelsWithJoinBlock[channelID]; withJoinBlock {
continue // Skip channels with join-blocks, since they were already initialized above.
}
rl, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Ledger factory reported channelID %s but could not retrieve it: %s", channelID, err)
}
configBlock := ConfigBlockOrPanic(rl)
configTx := protoutil.ExtractEnvelopeOrPanic(configBlock, 0)
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
ledgerRes, err := r.newLedgerResources(configTx)
if err != nil {
logger.Panicf("Error creating ledger resources: %s", err)
}
ordererConfig, _ := ledgerRes.OrdererConfig()
consenter, foundConsenter := r.consenters[ordererConfig.ConsensusType()]
if !foundConsenter {
logger.Panicf("Failed to find a consenter for consensus type: %s", ordererConfig.ConsensusType())
}
clusterConsenter, ok := consenter.(consensus.ClusterConsenter)
if !ok {
logger.Panic("clusterConsenter is not a consensus.ClusterConsenter")
}
isMember, err := clusterConsenter.IsChannelMember(configBlock)
if err != nil {
logger.Panicf("Failed to determine cluster membership from config-block, error: %s", err)
}
if isMember {
chainSupport, err := newChainSupport(r, ledgerRes, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
if err != nil {
logger.Panicf("Failed to create chain support for channel '%s', error: %s", channelID, err)
}
r.chains[channelID] = chainSupport
} else {
_, _, err := r.createFollower(ledgerRes, clusterConsenter, nil, channelID)
if err != nil {
logger.Panicf("Failed to create follower for channel '%s', error: %s", channelID, err)
}
}
}
}
func (r *Registrar) initLedgerResourcesClusterConsenter(configBlock *cb.Block) (*ledgerResources, consensus.ClusterConsenter, error) {
configEnv, err := protoutil.ExtractEnvelope(configBlock, 0)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed extracting config envelope from block")
}
ledgerRes, err := r.newLedgerResources(configEnv)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed creating ledger resources")
}
ordererConfig, _ := ledgerRes.OrdererConfig()
consenter, foundConsenter := r.consenters[ordererConfig.ConsensusType()]
if !foundConsenter {
return nil, nil, errors.Errorf("failed to find a consenter for consensus type: %s", ordererConfig.ConsensusType())
}
clusterConsenter, ok := consenter.(consensus.ClusterConsenter)
if !ok {
return nil, nil, errors.New("failed cast: clusterConsenter is not a consensus.ClusterConsenter")
}
return ledgerRes, clusterConsenter, nil
}
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
chdr, err := protoutil.ChannelHeader(msg)
if err != nil {
return nil, false, nil, errors.WithMessage(err, "could not determine channel ID")
}
cs := r.GetChain(chdr.ChannelId)
// Used to be new channel creation with the system channel, but now channels are created with the channel
// participation API only, so it is just a wrong channel name.
if cs == nil {
return chdr, false, nil, types.ErrChannelNotExist
}
isConfig := false
switch cs.ClassifyMsg(chdr) {
case msgprocessor.ConfigUpdateMsg:
isConfig = true
case msgprocessor.ConfigMsg:
return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
case msgprocessor.UnsupportedMsg:
return chdr, false, nil, errors.New("message is of type that is no longer supported")
default:
}
return chdr, isConfig, cs, nil
}
// GetConsensusChain retrieves the consensus.Chain of the channel, if it exists.
func (r *Registrar) GetConsensusChain(chainID string) consensus.Chain {
r.lock.RLock()
defer r.lock.RUnlock()
cs, exists := r.chains[chainID]
if !exists {
return nil
}
return cs.Chain
}
// GetChain retrieves the chain support for a chain if it exists.
func (r *Registrar) GetChain(chainID string) *ChainSupport {
r.lock.RLock()
defer r.lock.RUnlock()
return r.chains[chainID]
}
// GetFollower retrieves the follower.Chain if it exists.
func (r *Registrar) GetFollower(chainID string) *follower.Chain {
r.lock.RLock()
defer r.lock.RUnlock()
return r.followers[chainID]
}
func (r *Registrar) newLedgerResources(configTx *cb.Envelope) (*ledgerResources, error) {
payload, err := protoutil.UnmarshalPayload(configTx.Payload)
if err != nil {
return nil, errors.WithMessage(err, "error umarshaling envelope to payload")
}
if payload.Header == nil {
return nil, errors.New("missing channel header")
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, errors.WithMessage(err, "error unmarshalling channel header")
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.WithMessage(err, "error umarshaling config envelope from payload data")
}
bundle, err := channelconfig.NewBundle(chdr.ChannelId, configEnvelope.Config, r.bccsp)
if err != nil {
return nil, errors.WithMessage(err, "error creating channelconfig bundle")
}
err = checkResources(bundle)
if err != nil {
return nil, errors.WithMessagef(err, "error checking bundle for channel: %s", chdr.ChannelId)
}
ledger, err := r.ledgerFactory.GetOrCreate(chdr.ChannelId)
if err != nil {
return nil, errors.WithMessagef(err, "error getting ledger for channel: %s", chdr.ChannelId)
}
policy, exists := bundle.PolicyManager().GetPolicy(policies.BlockValidation)
if !exists {
return nil, errors.New("no policies in config block")
}
bftEnabled := bundle.ChannelConfig().Capabilities().ConsensusTypeBFT()
var consenters []*cb.Consenter
if bftEnabled {
cfg, ok := bundle.OrdererConfig()
if !ok {
return nil, errors.New("no orderer section in config block")
}
consenters = cfg.Consenters()
}
signatureVerifier := protoutil.BlockSignatureVerifier(bftEnabled, consenters, policy)
return &ledgerResources{
configResources: &configResources{
mutableResources: channelconfig.NewBundleSource(bundle, r.callbacks...),
bccsp: r.bccsp,
},
ReadWriter: ledger,
signatureVerifier: signatureVerifier,
}, nil
}
// CreateChain makes the Registrar create a consensus.Chain with the given name.
func (r *Registrar) CreateChain(chainName string) {
lf, err := r.ledgerFactory.GetOrCreate(chainName)
if err != nil {
logger.Panicf("Failed obtaining ledger factory for %s: %v", chainName, err)
}
chain := r.GetChain(chainName)
if chain != nil {
logger.Infof("A chain of type %T for channel %s already exists. "+
"Halting it.", chain.Chain, chainName)
r.lock.Lock()
chain.Halt()
delete(r.chains, chainName)
r.lock.Unlock()
}
r.newChain(configTx(lf))
}
func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()
channelName, err := channelNameFromConfigTx(configtx)
if err != nil {
logger.Warnf("Failed extracting channel name: %v", err)
return
}
// fixes https://github.com/hyperledger/fabric/issues/2931
if existingChain, exists := r.chains[channelName]; exists {
if _, isRaftChain := existingChain.Chain.(*etcdraft.Chain); isRaftChain {
logger.Infof("Channel %s already created, skipping its creation", channelName)
return
}
}
cs := r.createNewChain(configtx)
cs.start()
logger.Infof("Created and started new channel %s", cs.ChannelID())
}
func (r *Registrar) createNewChain(configtx *cb.Envelope) *ChainSupport {
ledgerResources, err := r.newLedgerResources(configtx)
if err != nil {
logger.Panicf("Error creating ledger resources: %s", err)
}
// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
if err := ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})); err != nil {
logger.Panicf("Error appending genesis block to ledger: %s", err)
}
}
cs, err := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
if err != nil {
logger.Panicf("Error creating chain support: %s", err)
}
chainID := ledgerResources.ConfigtxValidator().ChannelID()
r.chains[chainID] = cs
return cs
}
// SwitchFollowerToChain creates a consensus.Chain from the tip of the ledger, and removes the follower.
// It is called when a follower detects a config block that indicates cluster membership and halts, transferring
// execution to the consensus.Chain.
func (r *Registrar) SwitchFollowerToChain(channelID string) {
r.lock.Lock()
defer r.lock.Unlock()
lf, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Failed obtaining ledger factory for channel %s: %v", channelID, err)
}
if _, chainExists := r.chains[channelID]; chainExists {
logger.Panicf("Programming error, channel already exists: %s", channelID)
}
delete(r.followers, channelID)
logger.Debugf("Removed follower for channel %s", channelID)
cs := r.createNewChain(configTx(lf))
if err := r.removeJoinBlock(channelID); err != nil {
logger.Panicf("Failed removing join-block for channel: %s: %v", channelID, err)
}
cs.start()
logger.Infof("Created and started channel %s", cs.ChannelID())
}
// SwitchChainToFollower creates a follower.Chain from the tip of the ledger and removes the consensus.Chain.
// It is called when an etcdraft.Chain detects it was evicted from the cluster (i.e. removed from the consenters set)
// and halts, transferring execution to the follower.Chain.
func (r *Registrar) SwitchChainToFollower(channelName string) {
r.lock.Lock()
defer r.lock.Unlock()
if _, chainExists := r.chains[channelName]; !chainExists {
logger.Infof("Channel %s consenter was removed", channelName)
return
}
if _, followerExists := r.followers[channelName]; followerExists {
logger.Panicf("Programming error, both a follower.Chain and a consensus.Chain exist, channel: %s", channelName)
}
rl, err := r.ledgerFactory.GetOrCreate(channelName)
if err != nil {
logger.Panicf("Failed obtaining ledger factory for %s: %v", channelName, err)
}
configBlock := ConfigBlockOrPanic(rl)
ledgerRes, clusterConsenter, err := r.initLedgerResourcesClusterConsenter(configBlock)
if err != nil {
logger.Panicf("Error initializing ledgerResources & clusterConsenter: %s", err)
}
delete(r.chains, channelName)
logger.Debugf("Removed consensus.Chain for channel %s", channelName)
fChain, _, err := r.createFollower(ledgerRes, clusterConsenter, nil, channelName)
if err != nil {
logger.Panicf("Failed to create follower.Chain for channel '%s', error: %s", channelName, err)
}
fChain.Start()
logger.Infof("Created and started a follower.Chain for channel %s", channelName)
}
// ChannelsCount returns the count of the current total number of channels.
func (r *Registrar) ChannelsCount() int {
r.lock.RLock()
defer r.lock.RUnlock()
return len(r.chains) + len(r.followers)
}
// ChannelList returns a slice of ChannelInfoShort containing all application channels (excluding the system
// channel), and ChannelInfoShort of the system channel (nil if does not exist).
// The URL fields are empty, and are to be completed by the caller.
func (r *Registrar) ChannelList() types.ChannelList {
r.lock.RLock()
defer r.lock.RUnlock()
list := types.ChannelList{}
for name := range r.chains {
list.Channels = append(list.Channels, types.ChannelInfoShort{Name: name})
}
for name := range r.followers {
list.Channels = append(list.Channels, types.ChannelInfoShort{Name: name})
}
for c := range r.pendingRemoval {
list.Channels = append(list.Channels, types.ChannelInfoShort{
Name: c,
})
}
return list
}
// ChannelInfo provides extended status information about a channel.
// The URL field is empty, and is to be completed by the caller.
func (r *Registrar) ChannelInfo(channelID string) (types.ChannelInfo, error) {
r.lock.RLock()
defer r.lock.RUnlock()
info := types.ChannelInfo{Name: channelID}
if c, ok := r.chains[channelID]; ok {
info.Height = c.Height()
info.ConsensusRelation, info.Status = c.StatusReport()
return info, nil
}
if f, ok := r.followers[channelID]; ok {
info.Height = f.Height()
info.ConsensusRelation, info.Status = f.StatusReport()
return info, nil
}
status, ok := r.pendingRemoval[channelID]
if ok {
return types.ChannelInfo{
Name: channelID,
ConsensusRelation: status.ConsensusRelation,
Status: status.Status,
}, nil
}
return types.ChannelInfo{}, types.ErrChannelNotExist
}
// JoinChannel instructs the orderer to create a channel and join it with the provided config block.
// The URL field is empty, and is to be completed by the caller.
func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block) (info types.ChannelInfo, err error) {
r.lock.Lock()
defer r.lock.Unlock()
if status, ok := r.pendingRemoval[channelID]; ok {
if status.Status == types.StatusFailed {
return types.ChannelInfo{}, types.ErrChannelRemovalFailure
}
return types.ChannelInfo{}, types.ErrChannelPendingRemoval
}
if _, ok := r.chains[channelID]; ok {
return types.ChannelInfo{}, types.ErrChannelAlreadyExists
}
if _, ok := r.followers[channelID]; ok {
return types.ChannelInfo{}, types.ErrChannelAlreadyExists
}
defer func() {
if err != nil {
if err2 := r.ledgerFactory.Remove(channelID); err2 != nil {
logger.Warningf("Failed to cleanup ledger: %v", err2)
}
}
}()
ledgerRes, clusterConsenter, err := r.initLedgerResourcesClusterConsenter(configBlock)
if err != nil {
return types.ChannelInfo{}, err
}
blockBytes, err := proto.Marshal(configBlock)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "failed marshaling joinblock")
}
if err := r.joinBlockFileRepo.Save(channelID, blockBytes); err != nil {
return types.ChannelInfo{}, errors.WithMessagef(err, "failed saving joinblock to file repo for channel %s", channelID)
}
defer func() {
if err != nil {
if err2 := r.removeJoinBlock(channelID); err2 != nil {
logger.Warningf("Failed to cleanup joinblock for channel %s: %v", channelID, err2)
}
}
}()
isMember, err := clusterConsenter.IsChannelMember(configBlock)
if err != nil {
return types.ChannelInfo{}, errors.WithMessage(err, "failed to determine cluster membership from join-block")
}
if configBlock.Header.Number == 0 && isMember {
chain, info, err := r.createAsMember(ledgerRes, configBlock, channelID)
if err == nil {
if err := r.removeJoinBlock(channelID); err != nil {
return types.ChannelInfo{}, err
}
chain.start()
}
return info, err
}
fChain, info, err := r.createFollower(ledgerRes, clusterConsenter, configBlock, channelID)
if err != nil {
return info, errors.WithMessage(err, "failed to create follower")
}
fChain.Start()
logger.Infof("Joining channel: %v", info)
return info, err
}
func (r *Registrar) createAsMember(ledgerRes *ledgerResources, configBlock *cb.Block, channelID string) (*ChainSupport, types.ChannelInfo, error) {
if ledgerRes.Height() == 0 {
if err := ledgerRes.Append(configBlock); err != nil {
return nil, types.ChannelInfo{}, errors.WithMessage(err, "failed to append join block to the ledger")
}
}
chain, err := newChainSupport(
r,
ledgerRes,
r.consenters,
r.signer,
r.blockcutterMetrics,
r.bccsp,
)
if err != nil {
return nil, types.ChannelInfo{}, errors.WithMessage(err, "failed to create chain support")
}
info := types.ChannelInfo{
Name: channelID,
URL: "",
Height: ledgerRes.Height(),
}
info.ConsensusRelation, info.Status = chain.StatusReport()
r.chains[channelID] = chain
logger.Infof("Joining channel: %v", info)
return chain, info, nil
}
// createFollower created a follower.Chain, puts it in the map, but does not start it.
func (r *Registrar) createFollower(
ledgerRes *ledgerResources,
clusterConsenter consensus.ClusterConsenter,
joinBlock *cb.Block,
channelID string,
) (*follower.Chain, types.ChannelInfo, error) {
fLog := flogging.MustGetLogger("orderer.common.follower")
blockPullerCreator, err := follower.NewBlockPullerCreator(
channelID, fLog, r.signer, r.clusterDialer, r.config.General.Cluster, r.bccsp)
if err != nil {
return nil, types.ChannelInfo{}, errors.WithMessagef(err, "failed to create BlockPullerFactory for channel %s", channelID)
}
fChain, err := follower.NewChain(
ledgerRes,
clusterConsenter,
joinBlock,
follower.Options{
Logger: fLog,
},
blockPullerCreator,
r,
r.bccsp,
r,
)
if err != nil {
return nil, types.ChannelInfo{}, errors.WithMessagef(err, "failed to create follower for channel %s", channelID)
}
clusterRelation, status := fChain.StatusReport()
info := types.ChannelInfo{
Name: channelID,
URL: "",
Height: ledgerRes.Height(),
ConsensusRelation: clusterRelation,
Status: status,
}
r.followers[channelID] = fChain
logger.Debugf("Created follower.Chain: %v", info)
return fChain, info, nil
}
// RemoveChannel instructs the orderer to remove a channel.
func (r *Registrar) RemoveChannel(channelID string) error {
r.lock.Lock()
defer r.lock.Unlock()
status, ok := r.pendingRemoval[channelID]
if ok && status.Status != types.StatusFailed {
return types.ErrChannelPendingRemoval
}
cs, ok := r.chains[channelID]
if ok {
cs.Halt()
r.removeMember(channelID, cs)
return nil
}
fChain, ok := r.followers[channelID]
if ok {
fChain.Halt()
return r.removeFollower(channelID, fChain)
}
return types.ErrChannelNotExist
}
func (r *Registrar) removeMember(channelID string, cs *ChainSupport) {
relation, status := cs.StatusReport()
r.pendingRemoval[channelID] = consensus.StaticStatusReporter{ConsensusRelation: relation, Status: status}
r.removeLedgerAsync(channelID)
delete(r.chains, channelID)
logger.Infof("Removed channel: %s", channelID)
}
func (r *Registrar) removeFollower(channelID string, follower *follower.Chain) error {
// join block may still exist if the follower is:
// 1) still onboarding
// 2) active but not yet called registrar.SwitchFollowerToChain()
// NOTE: if the join block does not exist, os.RemoveAll returns nil
// so there is no harm attempting to remove a non-existent join block.
if err := r.removeJoinBlock(channelID); err != nil {
return err
}
relation, status := follower.StatusReport()
r.pendingRemoval[channelID] = consensus.StaticStatusReporter{ConsensusRelation: relation, Status: status}
r.removeLedgerAsync(channelID)
delete(r.followers, channelID)
logger.Infof("Removed channel: %s", channelID)
return nil
}
func (r *Registrar) loadJoinBlocks() map[string]*cb.Block {
channelToBlockMap := make(map[string]*cb.Block)
if !r.config.ChannelParticipation.Enabled {
return channelToBlockMap
}
channelsList, err := r.joinBlockFileRepo.List()
if err != nil {
logger.Panicf("Error listing join block file repo: %s", err)
}
logger.Debugf("Loading join-blocks for %d channels: %s", len(channelsList), channelsList)
for _, fileName := range channelsList {
channelName := r.joinBlockFileRepo.FileToBaseName(fileName)
blockBytes, err := r.joinBlockFileRepo.Read(channelName)
if err != nil {
logger.Panicf("Error reading join block file: '%s', error: %s", fileName, err)
}
block, err := protoutil.UnmarshalBlock(blockBytes)
if err != nil {
logger.Panicf("Error unmarshalling join block file: '%s', error: %s", fileName, err)
}
channelToBlockMap[channelName] = block
}
logger.Debug("Reconciling join-blocks and ledger by creating any missing ledger")
for channelID := range channelToBlockMap {
if _, err := r.ledgerFactory.GetOrCreate(channelID); err != nil {
logger.Panicf("Failed to create a ledger for channel: '%s', error: %s", channelID, err)
}
}
return channelToBlockMap
}
func (r *Registrar) removeJoinBlock(channelID string) error {
if err := r.joinBlockFileRepo.Remove(channelID); err != nil {
return errors.WithMessagef(err, "failed removing joinblock for channel %s", channelID)
}
return nil
}
func (r *Registrar) removeLedgerAsync(channelID string) {
go func() {
err := r.ledgerFactory.Remove(channelID)
r.lock.Lock()
defer r.lock.Unlock()
if err != nil {
r.pendingRemoval[channelID] = consensus.StaticStatusReporter{ConsensusRelation: r.pendingRemoval[channelID].ConsensusRelation, Status: types.StatusFailed}
r.channelParticipationMetrics.reportStatus(channelID, types.StatusFailed)
logger.Errorf("ledger factory failed to remove empty ledger '%s', error: %s", channelID, err)
return
}
delete(r.pendingRemoval, channelID)
}()
}
func (r *Registrar) ReportConsensusRelationAndStatusMetrics(channelID string, relation types.ConsensusRelation, status types.Status) {
r.channelParticipationMetrics.reportConsensusRelation(channelID, relation)
r.channelParticipationMetrics.reportStatus(channelID, status)
}
func channelNameFromConfigTx(configtx *cb.Envelope) (string, error) {
payload, err := protoutil.UnmarshalPayload(configtx.Payload)
if err != nil {
return "", errors.WithMessage(err, "error umarshaling envelope to payload")
}
if payload.Header == nil {
return "", errors.New("missing channel header")
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return "", errors.WithMessage(err, "error unmarshalling channel header")
}
return chdr.ChannelId, nil
}
func (r *Registrar) ApplyFilters(channel string, env *cb.Envelope) error {
r.lock.RLock()
cs, exists := r.chains[channel]
r.lock.RUnlock()
if !exists {
return errors.Errorf("channel %s doesn't exist", channel)
}
return msgprocessor.CreateStandardChannelFilters(cs, r.config).Apply(env)
}
func (r *Registrar) ProposeConfigUpdate(channel string, configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
r.lock.RLock()
cs, exists := r.chains[channel]
r.lock.RUnlock()
if !exists {
return nil, errors.Errorf("channel %s doesn't exist", channel)
}
return cs.ProposeConfigUpdate(configtx)
}