go_study/fabric-main/orderer/common/follower/follower_chain.go

584 lines
22 KiB
Go

/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package follower
import (
"bytes"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
// ErrChainStopped is returned when the chain is stopped during execution.
var ErrChainStopped = errors.New("chain stopped")
//go:generate counterfeiter -o mocks/ledger_resources.go -fake-name LedgerResources . LedgerResources
// LedgerResources defines some of the interfaces of ledger & config resources needed by the follower.Chain.
type LedgerResources interface {
// ChannelID The channel ID.
ChannelID() string
// Block returns a block with the given number,
// or nil if such a block doesn't exist.
Block(number uint64) *common.Block
// Height returns the number of blocks in the chain this channel is associated with.
Height() uint64
// Append appends a new block to the ledger in its raw form.
Append(block *common.Block) error
}
// TimeAfter has the signature of time.After and allows tests to provide an alternative implementation to it.
type TimeAfter func(d time.Duration) <-chan time.Time
//go:generate counterfeiter -o mocks/block_puller_factory.go -fake-name BlockPullerFactory . BlockPullerFactory
// BlockPullerFactory creates a ChannelPuller on demand, and exposes a method to update the a block signature verifier
// linked to that ChannelPuller.
type BlockPullerFactory interface {
BlockPuller(configBlock *common.Block, stopChannel chan struct{}) (ChannelPuller, error)
UpdateVerifierFromConfigBlock(configBlock *common.Block) error
}
//go:generate counterfeiter -o mocks/chain_creator.go -fake-name ChainCreator . ChainCreator
// ChainCreator defines a function that creates a new consensus.Chain for this channel, to replace the current
// follower.Chain. This interface is meant to be implemented by the multichannel.Registrar.
type ChainCreator interface {
SwitchFollowerToChain(chainName string)
}
//go:generate counterfeiter -o mocks/channel_participation_metrics_reporter.go -fake-name ChannelParticipationMetricsReporter . ChannelParticipationMetricsReporter
type ChannelParticipationMetricsReporter interface {
ReportConsensusRelationAndStatusMetrics(channelID string, relation types.ConsensusRelation, status types.Status)
}
// Chain implements a component that allows the orderer to follow a specific channel when is not a cluster member,
// that is, be a "follower" of the cluster. It also allows the orderer to perform "onboarding" for
// channels it is joining as a member, with a join-block.
//
// When an orderer is following a channel, it means that the current orderer is not a member of the consenters set
// of the channel, and is only pulling blocks from other orderers. In this mode, the follower is inspecting config
// blocks as they are pulled and if it discovers that it was introduced into the consenters set, it will trigger the
// creation of a regular etcdraft.Chain, that is, turn into a "member" of the cluster.
//
// A follower is also used to onboard a channel when joining as a member with a join-block that has number >0. In this
// mode the follower will pull blocks up until join-block.number, and then will trigger the creation of a regular
// etcdraft.Chain.
//
// The follower is started in one of two ways: 1) following an API Join request with a join-block that has
// block number >0, or 2) when the orderer was a cluster member (i.e. was running a etcdraft.Chain) and was removed
// from the consenters set.
//
// The follower is in status "onboarding" when it pulls blocks below the join-block number, or "active" when it
// pulls blocks equal or above the join-block number.
//
// The follower return clusterRelation "member" when the join-block indicates the orderer is in the consenters set,
// i.e. the follower is performing onboarding for an etcdraft.Chain. Otherwise, the follower return clusterRelation
// "follower".
type Chain struct {
mutex sync.Mutex // Protects the start/stop flags & channels, consensusRelation & status. All the rest are immutable or accessed only by the go-routine.
started bool // Start once.
stopped bool // Stop once.
stopChan chan struct{} // A 'closer' signals the go-routine to stop by closing this channel.
doneChan chan struct{} // The go-routine signals the 'closer' that it is done by closing this channel.
consensusRelation types.ConsensusRelation
status types.Status
ledgerResources LedgerResources // ledger & config resources
clusterConsenter consensus.ClusterConsenter // detects whether a block indicates channel membership
options Options
logger *flogging.FabricLogger
timeAfter TimeAfter // time.After by default, or an alternative from Options.
joinBlock *common.Block // The join-block the follower was started with.
lastConfig *common.Block // The last config block from the ledger. Accessed only by the go-routine.
firstHeight uint64 // The first ledger height
// Creates a block puller on demand, and allows the update of the block signature verifier with each incoming
// config block.
blockPullerFactory BlockPullerFactory
// A block puller instance, created either from the join-block or last-config-block. When pulling blocks using
// the last-config-block, the endpoints are updated with each incoming config block.
blockPuller ChannelPuller
// Creates a new consensus.Chain for this channel, to replace the current follower.Chain.
chainCreator ChainCreator
cryptoProvider bccsp.BCCSP // Cryptographic services
channelParticipationMetricsReporter ChannelParticipationMetricsReporter
}
// NewChain constructs a follower.Chain object.
func NewChain(
ledgerResources LedgerResources,
clusterConsenter consensus.ClusterConsenter,
joinBlock *common.Block,
options Options,
blockPullerFactory BlockPullerFactory,
chainCreator ChainCreator,
cryptoProvider bccsp.BCCSP,
channelParticipationMetricsReporter ChannelParticipationMetricsReporter,
) (*Chain, error) {
options.applyDefaults()
chain := &Chain{
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
consensusRelation: types.ConsensusRelationFollower,
status: types.StatusOnBoarding,
ledgerResources: ledgerResources,
clusterConsenter: clusterConsenter,
joinBlock: joinBlock,
firstHeight: ledgerResources.Height(),
options: options,
logger: options.Logger.With("channel", ledgerResources.ChannelID()),
timeAfter: options.TimeAfter,
blockPullerFactory: blockPullerFactory,
chainCreator: chainCreator,
cryptoProvider: cryptoProvider,
channelParticipationMetricsReporter: channelParticipationMetricsReporter,
}
if ledgerResources.Height() > 0 {
if err := chain.loadLastConfig(); err != nil {
return nil, err
}
if err := blockPullerFactory.UpdateVerifierFromConfigBlock(chain.lastConfig); err != nil {
return nil, err
}
}
if joinBlock == nil {
chain.status = types.StatusActive
if isMem, _ := chain.clusterConsenter.IsChannelMember(chain.lastConfig); isMem {
chain.consensusRelation = types.ConsensusRelationConsenter
}
chain.logger.Infof("Created with a nil join-block, ledger height: %d", chain.firstHeight)
} else {
if joinBlock.Header == nil {
return nil, errors.New("block header is nil")
}
if joinBlock.Data == nil {
return nil, errors.New("block data is nil")
}
// Check the block puller creation function once before we start the follower. This ensures we can extract
// the endpoints from the join-block.
puller, err := blockPullerFactory.BlockPuller(joinBlock, nil)
if err != nil {
return nil, errors.WithMessage(err, "error creating a block puller from join-block")
}
puller.Close()
if chain.joinBlock.Header.Number < chain.ledgerResources.Height() {
chain.status = types.StatusActive
}
if isMem, _ := chain.clusterConsenter.IsChannelMember(chain.joinBlock); isMem {
chain.consensusRelation = types.ConsensusRelationConsenter
}
chain.logger.Infof("Created with join-block number: %d, ledger height: %d", joinBlock.Header.Number, chain.firstHeight)
}
chain.logger.Debugf("Options are: %v", chain.options)
chain.channelParticipationMetricsReporter.ReportConsensusRelationAndStatusMetrics(ledgerResources.ChannelID(), chain.consensusRelation, chain.status)
return chain, nil
}
func (c *Chain) Start() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.started || c.stopped {
c.logger.Debugf("Not starting because: started=%v, stopped=%v", c.started, c.stopped)
return
}
c.started = true
go c.run()
c.logger.Info("Started")
}
// Halt signals the Chain to stop and waits for the internal go-routine to exit.
func (c *Chain) Halt() {
c.halt()
<-c.doneChan
}
func (c *Chain) halt() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.stopped {
c.logger.Debug("Already stopped")
return
}
c.stopped = true
close(c.stopChan)
c.logger.Info("Stopped")
}
// StatusReport returns the ConsensusRelation & Status.
func (c *Chain) StatusReport() (types.ConsensusRelation, types.Status) {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.consensusRelation, c.status
}
func (c *Chain) setStatus(status types.Status) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.status = status
c.channelParticipationMetricsReporter.ReportConsensusRelationAndStatusMetrics(c.ledgerResources.ChannelID(), c.consensusRelation, c.status)
}
func (c *Chain) setConsensusRelation(clusterRelation types.ConsensusRelation) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.consensusRelation = clusterRelation
c.channelParticipationMetricsReporter.ReportConsensusRelationAndStatusMetrics(c.ledgerResources.ChannelID(), c.consensusRelation, c.status)
}
func (c *Chain) Height() uint64 {
return c.ledgerResources.Height()
}
func (c *Chain) IsRunning() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.started {
select {
case <-c.doneChan:
return false
default:
return true
}
}
return false
}
func (c *Chain) run() {
c.logger.Debug("The follower.Chain puller goroutine is starting")
defer func() {
close(c.doneChan)
c.logger.Debug("The follower.Chain puller goroutine is exiting")
}()
if err := c.pull(); err != nil {
c.logger.Warnf("Pull failed, follower chain stopped, error: %s", err)
// TODO set the status to StatusError (see FAB-18106)
}
}
func (c *Chain) increaseRetryInterval(retryInterval *time.Duration, upperLimit time.Duration) {
if *retryInterval == upperLimit {
return
}
// assuming this will never overflow int64, as upperLimit cannot be over MaxInt64/2
*retryInterval = time.Duration(1.5 * float64(*retryInterval))
if *retryInterval > upperLimit {
*retryInterval = upperLimit
}
c.logger.Debugf("retry interval increased to: %v", *retryInterval)
}
func (c *Chain) resetRetryInterval(retryInterval *time.Duration, lowerLimit time.Duration) {
if *retryInterval == lowerLimit {
return
}
*retryInterval = lowerLimit
c.logger.Debugf("retry interval reset to: %v", *retryInterval)
}
func (c *Chain) decreaseRetryInterval(retryInterval *time.Duration, lowerLimit time.Duration) {
if *retryInterval == lowerLimit {
return
}
*retryInterval = *retryInterval - lowerLimit
if *retryInterval < lowerLimit {
*retryInterval = lowerLimit
}
c.logger.Debugf("retry interval decreased to: %v", *retryInterval)
}
// pull blocks from other orderers until a config block indicates the orderer has become a member of the cluster.
// When the follower.Chain's job is done, this method halts, triggers the creation of a new consensus.Chain,
// and returns nil. The method returns an error only when the chain is stopped or due to unrecoverable errors.
func (c *Chain) pull() error {
var err error
if c.joinBlock != nil {
err = c.pullUpToJoin()
if err != nil {
return errors.WithMessage(err, "failed to pull up to join block")
}
c.logger.Info("Onboarding finished successfully, pulled blocks up to join-block")
}
err = c.pullAfterJoin()
if err != nil {
return errors.WithMessage(err, "failed to pull after join block")
}
// Trigger creation of a new consensus.Chain.
c.logger.Info("Block pulling finished successfully, going to switch from follower to a consensus.Chain")
c.halt()
c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID())
return nil
}
// pullUpToJoin pulls blocks up to the join-block height without inspecting membership on fetched config blocks.
// It checks whether the chain was stopped between blocks.
func (c *Chain) pullUpToJoin() error {
targetHeight := c.joinBlock.Header.Number + 1
if c.ledgerResources.Height() >= targetHeight {
c.logger.Infof("Target height according to join block (%d) is <= to our ledger height (%d), no need to pull up to join block",
targetHeight, c.ledgerResources.Height())
return nil
}
var err error
// Block puller created with endpoints from the join-block.
c.blockPuller, err = c.blockPullerFactory.BlockPuller(c.joinBlock, c.stopChan)
if err != nil { // This should never happen since we check the join-block before we start.
return errors.WithMessagef(err, "error creating block puller")
}
defer c.blockPuller.Close()
// Since we created the block-puller with the join-block, do not update the endpoints from the
// config blocks that precede it.
err = c.pullUntilLatestWithRetry(targetHeight, false)
if err != nil {
return err
}
c.logger.Infof("Pulled blocks from %d until %d", c.firstHeight, targetHeight-1)
return nil
}
// pullAfterJoin pulls blocks continuously, inspecting the fetched config
// blocks for membership. On every config block, it renews the BlockPuller,
// to take in the new configuration. It will exit with 'nil' if it detects
// a config block that indicates the orderer is a member of the cluster. It
// checks whether the chain was stopped between blocks.
func (c *Chain) pullAfterJoin() error {
c.logger.Infof("Pulling after join")
defer c.logger.Infof("Pulled after join")
c.setStatus(types.StatusActive)
err := c.loadLastConfig()
if err != nil {
return errors.WithMessage(err, "failed to load last config block")
}
c.blockPuller, err = c.blockPullerFactory.BlockPuller(c.lastConfig, c.stopChan)
if err != nil {
return errors.WithMessage(err, "error creating block puller")
}
defer c.blockPuller.Close()
heightPollInterval := c.options.HeightPollMinInterval
for {
// Check membership
isMember, errMem := c.clusterConsenter.IsChannelMember(c.lastConfig)
if errMem != nil {
return errors.WithMessage(err, "failed to determine channel membership from last config")
}
if isMember {
c.setConsensusRelation(types.ConsensusRelationConsenter)
return nil
}
// Poll for latest network height to advance beyond ledger height.
var latestNetworkHeight uint64
heightPollLoop:
for {
endpoint, networkHeight, errHeight := cluster.LatestHeightAndEndpoint(c.blockPuller)
if errHeight != nil {
c.logger.Errorf("Failed to get latest height and endpoint, error: %s", errHeight)
} else {
c.logger.Infof("Orderer endpoint %s has the biggest ledger height: %d", endpoint, networkHeight)
}
if networkHeight > c.ledgerResources.Height() {
// On success, slowly decrease the polling interval
c.decreaseRetryInterval(&heightPollInterval, c.options.HeightPollMinInterval)
latestNetworkHeight = networkHeight
break heightPollLoop
}
c.logger.Infof("My height: %d, latest network height: %d; going to wait %v for latest height to grow",
c.ledgerResources.Height(), networkHeight, heightPollInterval)
select {
case <-c.stopChan:
c.logger.Debug("Received a stop signal")
return ErrChainStopped
case <-c.timeAfter(heightPollInterval):
// Exponential back-off, to avoid calling LatestHeightAndEndpoint too often.
c.increaseRetryInterval(&heightPollInterval, c.options.HeightPollMaxInterval)
}
}
// Pull to latest height or chain stop signal
err = c.pullUntilLatestWithRetry(latestNetworkHeight, true)
if err != nil {
return err
}
}
}
// pullUntilLatestWithRetry is given a target-height and exits without an error when it reaches that target.
// It return with an error only if the chain is stopped.
// On internal pull errors it employs exponential back-off and retries.
// When parameter updateEndpoints is true, the block-puller's endpoints are updated with every incoming config.
func (c *Chain) pullUntilLatestWithRetry(latestNetworkHeight uint64, updateEndpoints bool) error {
retryInterval := c.options.PullRetryMinInterval
for {
numPulled, errPull := c.pullUntilTarget(latestNetworkHeight, updateEndpoints)
if numPulled > 0 {
c.resetRetryInterval(&retryInterval, c.options.PullRetryMinInterval) // On any progress, reset retry interval.
}
if errPull == nil {
c.logger.Debugf("Pulled %d blocks until latest network height: %d", numPulled, latestNetworkHeight)
break
}
c.logger.Debugf("Error while trying to pull to latest height: %d; going to try again in %v; error: %s",
latestNetworkHeight, retryInterval, errPull)
select {
case <-c.stopChan:
c.logger.Debug("Received a stop signal")
return ErrChainStopped
case <-c.timeAfter(retryInterval):
// Exponential back-off on successive errors w/o progress.
c.increaseRetryInterval(&retryInterval, c.options.PullRetryMaxInterval)
}
}
return nil
}
// pullUntilTarget is given a target-height and exits without an error when it reaches that target.
// It may return with an error before the target, always returning the number of blocks pulled.
// When parameter updateEndpoints is true, the block-puller's endpoints are updated with every incoming config.
// The block-puller-factory which holds the block signature verifier is updated on every incoming config.
func (c *Chain) pullUntilTarget(targetHeight uint64, updateEndpoints bool) (uint64, error) {
firstBlockToPull := c.ledgerResources.Height()
if firstBlockToPull >= targetHeight {
c.logger.Debugf("Target height (%d) is <= to our ledger height (%d), skipping pulling", targetHeight, firstBlockToPull)
return 0, nil
}
var actualPrevHash []byte
// Initialize the actual previous hash
if firstBlockToPull > 0 {
prevBlock := c.ledgerResources.Block(firstBlockToPull - 1)
if prevBlock == nil {
return 0, errors.Errorf("cannot retrieve previous block %d", firstBlockToPull-1)
}
actualPrevHash = protoutil.BlockHeaderHash(prevBlock.Header)
}
// Pull until the latest height
for seq := firstBlockToPull; seq < targetHeight; seq++ {
n := seq - firstBlockToPull
select {
case <-c.stopChan:
c.logger.Debug("Received a stop signal")
return n, ErrChainStopped
default:
nextBlock := c.blockPuller.PullBlock(seq)
if nextBlock == nil {
return n, errors.WithMessagef(cluster.ErrRetryCountExhausted, "failed to pull block %d", seq)
}
reportedPrevHash := nextBlock.Header.PreviousHash
if (nextBlock.Header.Number > 0) && !bytes.Equal(reportedPrevHash, actualPrevHash) {
return n, errors.Errorf("block header previous hash mismatch on sequence %d, expected %x, got %x",
nextBlock.Header.Number, actualPrevHash, reportedPrevHash)
}
if c.joinBlock != nil && c.joinBlock.Header.Number == nextBlock.Header.Number {
// We don't need to verify the block.Data because we verify the join-block's DataHash against the
// hash(join-block.Data) when we verify it during the `Join` REST API call
if !proto.Equal(nextBlock.Header, c.joinBlock.Header) {
c.logger.Errorf("Block header mismatch between the block we pulled and the block we joined with, sequence %d", c.joinBlock.Header.Number)
return n, errors.Errorf("block header mismatch between the block we pulled and the block we joined with, sequence %d", c.joinBlock.Header.Number)
}
}
actualPrevHash = protoutil.BlockHeaderHash(nextBlock.Header)
if err := c.ledgerResources.Append(nextBlock); err != nil {
return n, errors.WithMessagef(err, "failed to append block %d to the ledger", nextBlock.Header.Number)
}
if protoutil.IsConfigBlock(nextBlock) {
c.logger.Debugf("Pulled blocks from %d to %d, last block is config", firstBlockToPull, nextBlock.Header.Number)
c.lastConfig = nextBlock
if err := c.blockPullerFactory.UpdateVerifierFromConfigBlock(nextBlock); err != nil {
return n, errors.WithMessagef(err, "failed to update verifier from last config, block number: %d", nextBlock.Header.Number)
}
if updateEndpoints {
endpoints, err := cluster.EndpointconfigFromConfigBlock(nextBlock, c.cryptoProvider)
if err != nil {
return n, errors.WithMessagef(err, "failed to extract endpoints from last config, block number: %d", nextBlock.Header.Number)
}
c.blockPuller.UpdateEndpoints(endpoints)
}
}
}
}
c.logger.Debugf("Pulled blocks from %d to %d", firstBlockToPull, targetHeight)
return targetHeight - firstBlockToPull, nil
}
func (c *Chain) loadLastConfig() error {
height := c.ledgerResources.Height()
if height == 0 {
return errors.New("ledger is empty")
}
lastBlock := c.ledgerResources.Block(height - 1)
index, err := protoutil.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
return errors.WithMessage(err, "chain does have appropriately encoded last config in its latest block")
}
lastConfig := c.ledgerResources.Block(index)
if lastConfig == nil {
return errors.Errorf("could not retrieve config block from index %d", index)
}
c.lastConfig = lastConfig
return nil
}