go_study/fabric-main/orderer/consensus/etcdraft/chain.go

1600 lines
48 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"context"
"encoding/pem"
"fmt"
"sync"
"sync/atomic"
"time"
"code.cloudfoundry.org/clock"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/wal"
)
const (
BYTE = 1 << (10 * iota)
KILOBYTE
MEGABYTE
GIGABYTE
TERABYTE
)
const (
// DefaultSnapshotCatchUpEntries is the default number of entries
// to preserve in memory when a snapshot is taken. This is for
// slow followers to catch up.
DefaultSnapshotCatchUpEntries = uint64(4)
// DefaultSnapshotIntervalSize is the default snapshot interval. It is
// used if SnapshotIntervalSize is not provided in channel config options.
// It is needed to enforce snapshot being set.
DefaultSnapshotIntervalSize = 16 * MEGABYTE
// DefaultEvictionSuspicion is the threshold that a node will start
// suspecting its own eviction if it has been leaderless for this
// period of time.
DefaultEvictionSuspicion = time.Minute * 10
// DefaultLeaderlessCheckInterval is the interval that a chain checks
// its own leadership status.
DefaultLeaderlessCheckInterval = time.Second * 10
// AbdicationMaxAttempts determines how many retries of leadership abdication we do
// for a transaction that removes ourselves from reconfiguration.
AbdicationMaxAttempts = 5
// EvictionConfigTxForwardingTimeOut determines how much time do we spend trying to forward the config-Tx that
// evicted the current node when it was a leader, after abdication, to the new leader.
EvictionConfigTxForwardingTimeOut = time.Second * 5
)
//go:generate counterfeiter -o mocks/configurator.go . Configurator
// Configurator is used to configure the communication layer
// when the chain starts.
type Configurator interface {
Configure(channel string, newNodes []cluster.RemoteNode)
}
//go:generate counterfeiter -o mocks/mock_rpc.go . RPC
// RPC is used to mock the transport layer in tests.
type RPC interface {
SendConsensus(dest uint64, msg *orderer.ConsensusRequest) error
SendSubmit(dest uint64, request *orderer.SubmitRequest, report func(err error)) error
}
//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller
// BlockPuller is used to pull blocks from other OSN
type BlockPuller interface {
PullBlock(seq uint64) *common.Block
HeightsByEndpoints() (map[string]uint64, error)
Close()
}
// CreateBlockPuller is a function to create BlockPuller on demand.
// It is passed into chain initializer so that tests could mock this.
type CreateBlockPuller func() (BlockPuller, error)
// Options contains all the configurations relevant to the chain.
type Options struct {
RPCTimeout time.Duration
RaftID uint64
Clock clock.Clock
WALDir string
SnapDir string
SnapshotIntervalSize uint32
// This is configurable mainly for testing purpose. Users are not
// expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used.
SnapshotCatchUpEntries uint64
MemoryStorage MemoryStorage
Logger *flogging.FabricLogger
TickInterval time.Duration
ElectionTick int
HeartbeatTick int
MaxSizePerMsg uint64
MaxInflightBlocks int
// BlockMetadata and Consenters should only be modified while under lock
// of raftMetadataLock
BlockMetadata *etcdraft.BlockMetadata
Consenters map[uint64]*etcdraft.Consenter
// MigrationInit is set when the node starts right after consensus-type migration
MigrationInit bool
Metrics *Metrics
Cert []byte
EvictionSuspicion time.Duration
LeaderCheckInterval time.Duration
}
type submit struct {
req *orderer.SubmitRequest
leader chan uint64
}
type gc struct {
index uint64
state raftpb.ConfState
data []byte
}
// Chain implements consensus.Chain interface.
type Chain struct {
configurator Configurator
rpc RPC
raftID uint64
channelID string
lastKnownLeader uint64
ActiveNodes atomic.Value
submitC chan *submit
applyC chan apply
observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
gcC chan *gc // Signal to take snapshot
errorCLock sync.RWMutex
errorC chan struct{} // returned by Errored()
raftMetadataLock sync.RWMutex
confChangeInProgress *raftpb.ConfChange
justElected bool // this is true when node has just been elected
configInflight bool // this is true when there is config block or ConfChange in flight
blockInflight int // number of in flight blocks
clock clock.Clock // Tests can inject a fake clock
support consensus.ConsenterSupport
lastBlock *common.Block
appliedIndex uint64
// needed by snapshotting
sizeLimit uint32 // SnapshotIntervalSize in bytes
accDataSize uint32 // accumulative data size since last snapshot
lastSnapBlockNum uint64
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
createPuller CreateBlockPuller // func used to create BlockPuller on demand
fresh bool // indicate if this is a fresh raft node
// this is exported so that test can use `Node.Status()` to get raft node status.
Node *node
opts Options
Metrics *Metrics
logger *flogging.FabricLogger
periodicChecker *PeriodicCheck
haltCallback func()
statusReportMutex sync.Mutex
consensusRelation types.ConsensusRelation
status types.Status
// BCCSP instance
CryptoProvider bccsp.BCCSP
leadershipTransferInProgress uint32
}
// NewChain constructs a chain object.
func NewChain(
support consensus.ConsenterSupport,
opts Options,
conf Configurator,
rpc RPC,
cryptoProvider bccsp.BCCSP,
f CreateBlockPuller,
haltCallback func(),
observeC chan<- raft.SoftState,
) (*Chain, error) {
lg := opts.Logger.With("channel", support.ChannelID(), "node", opts.RaftID)
fresh := !wal.Exist(opts.WALDir)
storage, err := CreateStorage(lg, opts.WALDir, opts.SnapDir, opts.MemoryStorage)
if err != nil {
return nil, errors.Errorf("failed to restore persisted raft data: %s", err)
}
if opts.SnapshotCatchUpEntries == 0 {
storage.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
} else {
storage.SnapshotCatchUpEntries = opts.SnapshotCatchUpEntries
}
sizeLimit := opts.SnapshotIntervalSize
if sizeLimit == 0 {
sizeLimit = DefaultSnapshotIntervalSize
}
// get block number in last snapshot, if exists
var snapBlkNum uint64
var cc raftpb.ConfState
if s := storage.Snapshot(); !raft.IsEmptySnap(s) {
b := protoutil.UnmarshalBlockOrPanic(s.Data)
snapBlkNum = b.Header.Number
cc = s.Metadata.ConfState
}
b := support.Block(support.Height() - 1)
if b == nil {
return nil, errors.Errorf("failed to get last block")
}
c := &Chain{
configurator: conf,
rpc: rpc,
channelID: support.ChannelID(),
raftID: opts.RaftID,
submitC: make(chan *submit),
applyC: make(chan apply),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
startC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
errorC: make(chan struct{}),
gcC: make(chan *gc),
observeC: observeC,
support: support,
fresh: fresh,
appliedIndex: opts.BlockMetadata.RaftIndex,
lastBlock: b,
sizeLimit: sizeLimit,
lastSnapBlockNum: snapBlkNum,
confState: cc,
createPuller: f,
clock: opts.Clock,
haltCallback: haltCallback,
consensusRelation: types.ConsensusRelationConsenter,
status: types.StatusActive,
Metrics: &Metrics{
ClusterSize: opts.Metrics.ClusterSize.With("channel", support.ChannelID()),
IsLeader: opts.Metrics.IsLeader.With("channel", support.ChannelID()),
ActiveNodes: opts.Metrics.ActiveNodes.With("channel", support.ChannelID()),
CommittedBlockNumber: opts.Metrics.CommittedBlockNumber.With("channel", support.ChannelID()),
SnapshotBlockNumber: opts.Metrics.SnapshotBlockNumber.With("channel", support.ChannelID()),
LeaderChanges: opts.Metrics.LeaderChanges.With("channel", support.ChannelID()),
ProposalFailures: opts.Metrics.ProposalFailures.With("channel", support.ChannelID()),
DataPersistDuration: opts.Metrics.DataPersistDuration.With("channel", support.ChannelID()),
NormalProposalsReceived: opts.Metrics.NormalProposalsReceived.With("channel", support.ChannelID()),
ConfigProposalsReceived: opts.Metrics.ConfigProposalsReceived.With("channel", support.ChannelID()),
},
logger: lg,
opts: opts,
CryptoProvider: cryptoProvider,
}
// Sets initial values for metrics
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
c.Metrics.IsLeader.Set(float64(0)) // all nodes start out as followers
c.Metrics.ActiveNodes.Set(float64(0))
c.Metrics.CommittedBlockNumber.Set(float64(c.lastBlock.Header.Number))
c.Metrics.SnapshotBlockNumber.Set(float64(c.lastSnapBlockNum))
// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
// We guard against replay of written blocks with `appliedIndex` instead.
config := &raft.Config{
ID: c.raftID,
ElectionTick: c.opts.ElectionTick,
HeartbeatTick: c.opts.HeartbeatTick,
MaxSizePerMsg: c.opts.MaxSizePerMsg,
MaxInflightMsgs: c.opts.MaxInflightBlocks,
Logger: c.logger,
Storage: c.opts.MemoryStorage,
// PreVote prevents reconnected node from disturbing network.
// See etcd/raft doc for more details.
PreVote: true,
CheckQuorum: true,
DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
}
disseminator := &Disseminator{RPC: c.rpc}
disseminator.UpdateMetadata(nil) // initialize
c.ActiveNodes.Store([]uint64{})
c.Node = &node{
chainID: c.channelID,
chain: c,
logger: c.logger,
metrics: c.Metrics,
storage: storage,
rpc: disseminator,
config: config,
tickInterval: c.opts.TickInterval,
clock: c.clock,
metadata: c.opts.BlockMetadata,
tracker: &Tracker{
id: c.raftID,
sender: disseminator,
gauge: c.Metrics.ActiveNodes,
active: &c.ActiveNodes,
logger: c.logger,
},
}
c.Node.confState.Store(&cc)
return c, nil
}
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
if err := c.configureComm(); err != nil {
c.logger.Errorf("Failed to start chain, aborting: +%v", err)
close(c.doneC)
return
}
isJoin := c.support.Height() > 1
if isJoin && c.opts.MigrationInit {
isJoin = false
c.logger.Infof("Consensus-type migration detected, starting new raft node on an existing channel; height=%d", c.support.Height())
}
c.Node.start(c.fresh, isJoin)
close(c.startC)
close(c.errorC)
go c.gc()
go c.run()
es := c.newEvictionSuspector()
interval := DefaultLeaderlessCheckInterval
if c.opts.LeaderCheckInterval != 0 {
interval = c.opts.LeaderCheckInterval
}
c.periodicChecker = &PeriodicCheck{
Logger: c.logger,
Report: es.confirmSuspicion,
ReportCleared: es.clearSuspicion,
CheckInterval: interval,
Condition: c.suspectEviction,
}
c.periodicChecker.Run()
}
// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.NormalProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Configure submits config type transactions for ordering.
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
c.Metrics.ConfigProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
func (c *Chain) WaitReady() error {
if err := c.isRunning(); err != nil {
return err
}
select {
case c.submitC <- nil:
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
return nil
}
// Errored returns a channel that closes when the chain stops.
func (c *Chain) Errored() <-chan struct{} {
c.errorCLock.RLock()
defer c.errorCLock.RUnlock()
return c.errorC
}
// Halt stops the chain.
func (c *Chain) Halt() {
c.stop()
}
func (c *Chain) stop() bool {
select {
case <-c.startC:
default:
c.logger.Warn("Attempted to halt a chain that has not started")
return false
}
select {
case c.haltC <- struct{}{}:
case <-c.doneC:
return false
}
<-c.doneC
c.statusReportMutex.Lock()
defer c.statusReportMutex.Unlock()
c.status = types.StatusInactive
return true
}
// halt stops the chain and calls the haltCallback function, which allows the
// chain to transfer responsibility to a follower or the inactive chain registry when a chain
// discovers it is no longer a member of a channel.
func (c *Chain) halt() {
if stopped := c.stop(); !stopped {
c.logger.Info("This node was stopped, the haltCallback will not be called")
return
}
if c.haltCallback != nil {
c.haltCallback() // Must be invoked WITHOUT any internal lock
c.statusReportMutex.Lock()
defer c.statusReportMutex.Unlock()
// If the haltCallback registers the chain in to the inactive chain registry (i.e., system channel exists) then
// this is the correct consensusRelation. If the haltCallback transfers responsibility to a follower.Chain, then
// this chain is about to be GC anyway. The new follower.Chain replacing this one will report the correct
// StatusReport.
c.consensusRelation = types.ConsensusRelationConfigTracker
}
// active nodes metric shouldn't be frozen once a channel is stopped.
c.Metrics.ActiveNodes.Set(float64(0))
}
func (c *Chain) isRunning() error {
select {
case <-c.startC:
default:
return errors.Errorf("chain is not started")
}
select {
case <-c.doneC:
return errors.Errorf("chain is stopped")
default:
}
return nil
}
// Consensus passes the given ConsensusRequest message to the raft.Node instance
func (c *Chain) Consensus(req *orderer.ConsensusRequest, sender uint64) error {
if err := c.isRunning(); err != nil {
return err
}
stepMsg := &raftpb.Message{}
if err := proto.Unmarshal(req.Payload, stepMsg); err != nil {
return fmt.Errorf("failed to unmarshal StepRequest payload to Raft Message: %s", err)
}
if stepMsg.To != c.raftID {
c.logger.Warnf("Received msg to %d, my ID is probably wrong due to out of date, cowardly halting", stepMsg.To)
c.halt()
return nil
}
if err := c.Node.Step(context.TODO(), *stepMsg); err != nil {
return fmt.Errorf("failed to process Raft Step message: %s", err)
}
if len(req.Metadata) == 0 || atomic.LoadUint64(&c.lastKnownLeader) != sender { // ignore metadata from non-leader
return nil
}
clusterMetadata := &etcdraft.ClusterMetadata{}
if err := proto.Unmarshal(req.Metadata, clusterMetadata); err != nil {
return errors.Errorf("failed to unmarshal ClusterMetadata: %s", err)
}
c.Metrics.ActiveNodes.Set(float64(len(clusterMetadata.ActiveNodes)))
c.ActiveNodes.Store(clusterMetadata.ActiveNodes)
return nil
}
// Submit forwards the incoming request to:
// - the local run goroutine if this is leader
// - the actual leader via the transport mechanism
// The call fails if there's no leader elected yet.
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
if err := c.isRunning(); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}:
lead := <-leadC
if lead == raft.None {
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("no Raft leader")
}
if lead != c.raftID {
if err := c.forwardToLeader(lead, req); err != nil {
return err
}
}
case <-c.doneC:
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("chain is stopped")
}
return nil
}
func (c *Chain) forwardToLeader(lead uint64, req *orderer.SubmitRequest) error {
c.logger.Infof("Forwarding transaction to the leader %d", lead)
timer := time.NewTimer(c.opts.RPCTimeout)
defer timer.Stop()
sentChan := make(chan struct{})
atomicErr := &atomic.Value{}
report := func(err error) {
if err != nil {
atomicErr.Store(err.Error())
c.Metrics.ProposalFailures.Add(1)
}
close(sentChan)
}
c.rpc.SendSubmit(lead, req, report)
select {
case <-sentChan:
case <-c.doneC:
return errors.Errorf("chain is stopped")
case <-timer.C:
return errors.Errorf("timed out (%v) waiting on forwarding to %d", c.opts.RPCTimeout, lead)
}
if atomicErr.Load() != nil {
return errors.Errorf(atomicErr.Load().(string))
}
return nil
}
type apply struct {
entries []raftpb.Entry
soft *raft.SoftState
}
func isCandidate(state raft.StateType) bool {
return state == raft.StatePreCandidate || state == raft.StateCandidate
}
func (c *Chain) run() {
ticking := false
timer := c.clock.NewTimer(time.Second)
// we need a stopped timer rather than nil,
// because we will be select waiting on timer.C()
if !timer.Stop() {
<-timer.C()
}
// if timer is already started, this is a no-op
startTimer := func() {
if !ticking {
ticking = true
timer.Reset(c.support.SharedConfig().BatchTimeout())
}
}
stopTimer := func() {
if !timer.Stop() && ticking {
// we only need to drain the channel if the timer expired (not explicitly stopped)
<-timer.C()
}
ticking = false
}
var soft raft.SoftState
submitC := c.submitC
var bc *blockCreator
var propC chan<- *common.Block
var cancelProp context.CancelFunc
cancelProp = func() {} // no-op as initial value
becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
c.Metrics.IsLeader.Set(1)
c.blockInflight = 0
c.justElected = true
submitC = nil
ch := make(chan *common.Block, c.opts.MaxInflightBlocks)
// if there is unfinished ConfChange, we should resume the effort to propose it as
// new leader, and wait for it to be committed before start serving new requests.
if cc := c.getInFlightConfChange(); cc != nil {
// The reason `ProposeConfChange` should be called in go routine is documented in `writeConfigBlock` method.
go func() {
if err := c.Node.ProposeConfChange(context.TODO(), *cc); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = cc
c.configInflight = true
}
// Leader should call Propose in go routine, because this method may be blocked
// if node is leaderless (this can happen when leader steps down in a heavily
// loaded network). We need to make sure applyC can still be consumed properly.
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context, ch <-chan *common.Block) {
for {
select {
case b := <-ch:
data := protoutil.MarshalOrPanic(b)
if err := c.Node.Propose(ctx, data); err != nil {
c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
return
}
c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)
case <-ctx.Done():
c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
return
}
}
}(ctx, ch)
return ch, cancel
}
becomeFollower := func() {
cancelProp()
c.blockInflight = 0
_ = c.support.BlockCutter().Cut()
stopTimer()
submitC = c.submitC
bc = nil
c.Metrics.IsLeader.Set(0)
}
for {
select {
case s := <-submitC:
if s == nil {
// polled by `WaitReady`
continue
}
if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
s.leader <- raft.None
continue
}
s.leader <- soft.Lead
if soft.Lead != c.raftID {
continue
}
batches, pending, err := c.ordered(s.req)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
continue
}
if !pending && len(batches) == 0 {
continue
}
if pending {
startTimer() // no-op if timer is already started
} else {
stopTimer()
}
c.propose(propC, bc, batches...)
if c.configInflight {
c.logger.Info("Received config transaction, pause accepting transaction till it is committed")
submitC = nil
} else if c.blockInflight >= c.opts.MaxInflightBlocks {
c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
c.blockInflight, c.opts.MaxInflightBlocks)
submitC = nil
}
case app := <-c.applyC:
if app.soft != nil {
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
if newLeader != soft.Lead {
c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
c.Metrics.LeaderChanges.Add(1)
atomic.StoreUint64(&c.lastKnownLeader, newLeader)
if newLeader == c.raftID {
propC, cancelProp = becomeLeader()
}
if soft.Lead == c.raftID {
becomeFollower()
}
}
foundLeader := soft.Lead == raft.None && newLeader != raft.None
quitCandidate := isCandidate(soft.RaftState) && !isCandidate(app.soft.RaftState)
if foundLeader || quitCandidate {
c.errorCLock.Lock()
c.errorC = make(chan struct{})
c.errorCLock.Unlock()
}
if isCandidate(app.soft.RaftState) || newLeader == raft.None {
atomic.StoreUint64(&c.lastKnownLeader, raft.None)
select {
case <-c.errorC:
default:
nodeCount := len(c.opts.BlockMetadata.ConsenterIds)
// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
if nodeCount > 2 {
close(c.errorC)
} else {
c.logger.Warningf("No leader is present, cluster size is %d", nodeCount)
}
}
}
soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}
// notify external observer
select {
case c.observeC <- soft:
default:
}
lcs := c.Node.leaderChangeSubscription.Load()
if lcs != nil {
if soft.Lead != raft.None {
subscription := lcs.(func(uint64))
subscription(soft.Lead)
}
}
}
c.apply(app.entries)
if c.justElected {
msgInflight := c.Node.lastIndex() > c.appliedIndex
if msgInflight {
c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
continue
}
if c.configInflight {
c.logger.Debugf("There is config block in flight, new leader should not serve requests")
continue
}
c.logger.Infof("Start accepting requests as Raft leader at block [%d]", c.lastBlock.Header.Number)
bc = &blockCreator{
hash: protoutil.BlockHeaderHash(c.lastBlock.Header),
number: c.lastBlock.Header.Number,
logger: c.logger,
}
submitC = c.submitC
c.justElected = false
} else if c.configInflight {
c.logger.Info("Config block or ConfChange in flight, pause accepting transaction")
submitC = nil
} else if c.blockInflight < c.opts.MaxInflightBlocks {
submitC = c.submitC
}
case <-timer.C():
ticking = false
batch := c.support.BlockCutter().Cut()
if len(batch) == 0 {
c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
c.logger.Debugf("Batch timer expired, creating block")
c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
case sn := <-c.snapC:
if sn.Metadata.Index != 0 {
if sn.Metadata.Index <= c.appliedIndex {
c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
break
}
c.confState = sn.Metadata.ConfState
c.appliedIndex = sn.Metadata.Index
} else {
c.logger.Infof("Received artificial snapshot to trigger catchup")
}
if err := c.catchUp(sn); err != nil {
c.logger.Panicf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
sn.Metadata.Term, sn.Metadata.Index, err)
}
case <-c.doneC:
stopTimer()
cancelProp()
select {
case <-c.errorC: // avoid closing closed channel
default:
close(c.errorC)
}
c.logger.Infof("Stop serving requests")
c.periodicChecker.Stop()
return
}
}
}
func (c *Chain) writeBlock(block *common.Block, index uint64) {
if block.Header.Number > c.lastBlock.Header.Number+1 {
c.logger.Panicf("Got block [%d], expect block [%d]", block.Header.Number, c.lastBlock.Header.Number+1)
} else if block.Header.Number < c.lastBlock.Header.Number+1 {
c.logger.Infof("Got block [%d], expect block [%d], this node was forced to catch up", block.Header.Number, c.lastBlock.Header.Number+1)
return
}
if c.blockInflight > 0 {
c.blockInflight-- // only reduce on leader
}
c.lastBlock = block
c.logger.Infof("Writing block [%d] (Raft index: %d) to ledger", block.Header.Number, index)
if protoutil.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
return
}
c.raftMetadataLock.Lock()
c.opts.BlockMetadata.RaftIndex = index
m := protoutil.MarshalOrPanic(c.opts.BlockMetadata)
c.raftMetadataLock.Unlock()
c.support.WriteBlock(block, m)
}
// Orders the envelope in the `msg` content. SubmitRequest.
// Returns
//
// -- batches [][]*common.Envelope; the batches cut,
// -- pending bool; if there are envelopes pending to be ordered,
// -- err error; the error encountered, if any.
//
// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.
func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelope, pending bool, err error) {
seq := c.support.Sequence()
isconfig, err := c.isConfig(msg.Payload)
if err != nil {
return nil, false, errors.Errorf("bad message: %s", err)
}
if isconfig {
// ConfigMsg
if msg.LastValidationSeq < seq {
c.logger.Warnf("Config message was validated against %d, although current config seq has advanced (%d)", msg.LastValidationSeq, seq)
msg.Payload, _, err = c.support.ProcessConfigMsg(msg.Payload)
if err != nil {
c.Metrics.ProposalFailures.Add(1)
return nil, true, errors.Errorf("bad config message: %s", err)
}
}
if c.checkForEvictionNCertRotation(msg.Payload) {
if !atomic.CompareAndSwapUint32(&c.leadershipTransferInProgress, 0, 1) {
c.logger.Warnf("A reconfiguration transaction is already in progress, ignoring a subsequent transaction")
return
}
go func() {
defer atomic.StoreUint32(&c.leadershipTransferInProgress, 0)
abdicated := false
for attempt := 1; attempt <= AbdicationMaxAttempts; attempt++ {
if err := c.Node.abdicateLeadership(); err != nil {
// If there is no leader, abort and do not retry.
// Return early to prevent re-submission of the transaction
if err == ErrNoLeader || err == ErrChainHalting {
c.logger.Warningf("Abdication attempt no.%d failed because there is no leader or chain halting, will not try again, will not submit TX, error: %s", attempt, err)
return
}
// If the error isn't any of the below, it's a programming error, so panic.
if err != ErrNoAvailableLeaderCandidate && err != ErrTimedOutLeaderTransfer {
c.logger.Panicf("Programming error, abdicateLeader() returned with an unexpected error: %s", err)
}
// Else, it's one of the errors above, so we retry.
c.logger.Warningf("Abdication attempt no.%d failed, will try again, error: %s", attempt, err)
continue
} else {
abdicated = true
break
}
}
if !abdicated {
c.logger.Warnf("Abdication failed too many times consecutively (%d), aborting retries", AbdicationMaxAttempts)
return
}
// Abdication succeeded, so we submit the transaction (which forwards to the leader).
// We do 7 attempts at increasing intervals (1/2/4/8/16/32) or up to EvictionConfigTxForwardingTimeOut.
timeout := time.After(EvictionConfigTxForwardingTimeOut)
retryInterval := EvictionConfigTxForwardingTimeOut / 64
for {
err := c.Submit(msg, 0)
if err == nil {
c.logger.Warnf("Reconfiguration transaction forwarded successfully")
break
}
c.logger.Warnf("Reconfiguration transaction forwarding failed, will try again in %s, error: %s", retryInterval, err)
select {
case <-time.After(retryInterval):
retryInterval = 2 * retryInterval
continue
case <-timeout:
c.logger.Warnf("Reconfiguration transaction forwarding failed with timeout after %s", EvictionConfigTxForwardingTimeOut)
return
}
}
}()
return nil, false, nil
}
batch := c.support.BlockCutter().Cut()
batches = [][]*common.Envelope{}
if len(batch) != 0 {
batches = append(batches, batch)
}
batches = append(batches, []*common.Envelope{msg.Payload})
return batches, false, nil
}
// it is a normal message
if msg.LastValidationSeq < seq {
c.logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", msg.LastValidationSeq, seq)
if _, err := c.support.ProcessNormalMsg(msg.Payload); err != nil {
c.Metrics.ProposalFailures.Add(1)
return nil, true, errors.Errorf("bad normal message: %s", err)
}
}
batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
return batches, pending, nil
}
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch)
c.logger.Infof("Created block [%d], there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b:
default:
c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
}
// if it is config block, then we should wait for the commit of the block
if protoutil.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
}
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
b, err := protoutil.UnmarshalBlock(snap.Data)
if err != nil {
return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
}
if c.lastBlock.Header.Number >= b.Header.Number {
c.logger.Warnf("Snapshot is at block [%d], local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
return nil
} else if b.Header.Number == c.lastBlock.Header.Number+1 {
c.logger.Infof("The only missing block [%d] is encapsulated in snapshot, committing it to shortcut catchup process", b.Header.Number)
c.commitBlock(b)
c.lastBlock = b
return nil
}
puller, err := c.createPuller()
if err != nil {
return errors.Errorf("failed to create block puller: %s", err)
}
defer puller.Close()
next := c.lastBlock.Header.Number + 1
c.logger.Infof("Catching up with snapshot taken at block [%d], starting from block [%d]", b.Header.Number, next)
for next <= b.Header.Number {
block := puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block [%d] from cluster", next)
}
c.commitBlock(block)
c.lastBlock = block
next++
}
c.logger.Infof("Finished syncing with cluster up to and including block [%d]", b.Header.Number)
return nil
}
func (c *Chain) commitBlock(block *common.Block) {
// read consenters metadata to write into the replicated block
blockMeta, err := protoutil.GetConsenterMetadataFromBlock(block)
if err != nil {
c.logger.Panicf("Failed to obtain metadata: %s", err)
}
if !protoutil.IsConfigBlock(block) {
c.support.WriteBlock(block, blockMeta.Value)
return
}
c.support.WriteConfigBlock(block, blockMeta.Value)
configMembership := c.detectConfChange(block)
if configMembership != nil && configMembership.Changed() {
c.logger.Infof("Config block [%d] changes consenter set, communication should be reconfigured", block.Header.Number)
c.raftMetadataLock.Lock()
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
c.raftMetadataLock.Unlock()
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
}
func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
// If config is targeting THIS channel, inspect consenter set and
// propose raft ConfChange if it adds/removes node.
configMetadata := c.newConfigMetadata(block)
if configMetadata == nil {
return nil
}
if configMetadata.Options != nil &&
configMetadata.Options.SnapshotIntervalSize != 0 &&
configMetadata.Options.SnapshotIntervalSize != c.sizeLimit {
c.logger.Infof("Update snapshot interval size to %d bytes (was %d)",
configMetadata.Options.SnapshotIntervalSize, c.sizeLimit)
c.sizeLimit = configMetadata.Options.SnapshotIntervalSize
}
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMetadata.Consenters)
if err != nil {
c.logger.Panicf("illegal configuration change detected: %s", err)
}
if changes.Rotated() {
c.logger.Infof("Config block [%d] rotates TLS certificate of node %d", block.Header.Number, changes.RotatedNode)
}
return changes
}
func (c *Chain) apply(ents []raftpb.Entry) {
if len(ents) == 0 {
return
}
if ents[0].Index > c.appliedIndex+1 {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}
var position int
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
break
}
position = i
c.accDataSize += uint32(len(ents[i].Data))
// We need to strictly avoid re-applying normal entries,
// otherwise we are writing the same block twice.
if ents[i].Index <= c.appliedIndex {
c.logger.Debugf("Received block with raft index (%d) <= applied index (%d), skip", ents[i].Index, c.appliedIndex)
break
}
block := protoutil.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index)
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
continue
}
c.confState = *c.Node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Voters)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Voters)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
// This ConfChange was introduced by a previously committed config block,
// we can now unblock submitC to accept envelopes.
var configureComm bool
if c.confChangeInProgress != nil &&
c.confChangeInProgress.NodeID == cc.NodeID &&
c.confChangeInProgress.Type == cc.Type {
configureComm = true
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
}
shouldHalt := cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID
// unblock `run` go routine so it can still consume Raft messages
go func() {
if configureComm && !shouldHalt { // no need to configure comm if this node is going to halt
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
if shouldHalt {
c.logger.Infof("This node is being removed from replica set")
c.halt()
return
}
}()
}
if ents[i].Index > c.appliedIndex {
c.appliedIndex = ents[i].Index
}
}
// at position==0, ents[position].Type is ambiguous, it can be either of {raftpb.EntryNormal, raftpb.EntryConfChange}
// take a snapshot only for ents[position].Type == raftpb.EntryNormal
if c.accDataSize >= c.sizeLimit && ents[position].Type == raftpb.EntryNormal && len(ents[position].Data) > 0 {
b := protoutil.UnmarshalBlockOrPanic(ents[position].Data)
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block [%d] (index: %d), last snapshotted block number is %d, current nodes: %+v",
c.accDataSize, c.sizeLimit, b.Header.Number, c.appliedIndex, c.lastSnapBlockNum, c.confState.Voters)
c.accDataSize = 0
c.lastSnapBlockNum = b.Header.Number
c.Metrics.SnapshotBlockNumber.Set(float64(b.Header.Number))
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotIntervalSize is too small")
}
}
}
func (c *Chain) gc() {
for {
select {
case g := <-c.gcC:
c.Node.takeSnapshot(g.index, g.state, g.data)
case <-c.doneC:
c.logger.Infof("Stop garbage collecting")
return
}
}
}
func (c *Chain) isConfig(env *common.Envelope) (bool, error) {
h, err := protoutil.ChannelHeader(env)
if err != nil {
c.logger.Errorf("failed to extract channel header from envelope")
return false, err
}
return h.Type == int32(common.HeaderType_CONFIG), nil
}
func (c *Chain) configureComm() error {
// Reset unreachable map when communication is reconfigured
c.Node.unreachableLock.Lock()
c.Node.unreachable = make(map[uint64]struct{})
c.Node.unreachableLock.Unlock()
nodes, err := c.remotePeers()
if err != nil {
return err
}
c.configurator.Configure(c.channelID, nodes)
return nil
}
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
c.raftMetadataLock.RLock()
defer c.raftMetadataLock.RUnlock()
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.Consenters {
// No need to know yourself
if raftID == c.raftID {
continue
}
serverCertAsDER, err := pemToDER(consenter.ServerTlsCert, raftID, "server", c.logger)
if err != nil {
return nil, errors.WithStack(err)
}
clientCertAsDER, err := pemToDER(consenter.ClientTlsCert, raftID, "client", c.logger)
if err != nil {
return nil, errors.WithStack(err)
}
nodes = append(nodes, cluster.RemoteNode{
NodeAddress: cluster.NodeAddress{
ID: raftID,
Endpoint: fmt.Sprintf("%s:%d", consenter.Host, consenter.Port),
},
NodeCerts: cluster.NodeCerts{
ServerTLSCert: serverCertAsDER,
ClientTLSCert: clientCertAsDER,
},
})
}
return nodes, nil
}
func pemToDER(pemBytes []byte, id uint64, certType string, logger *flogging.FabricLogger) ([]byte, error) {
bl, _ := pem.Decode(pemBytes)
if bl == nil {
logger.Errorf("Rejecting PEM block of %s TLS cert for node %d, offending PEM is: %s", certType, id, string(pemBytes))
return nil, errors.Errorf("invalid PEM block")
}
return bl.Bytes, nil
}
// writeConfigBlock writes configuration blocks into the ledger in
// addition extracts updates about raft replica set and if there
// are changes updates cluster membership as well
func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
hdr, err := ConfigChannelHeader(block)
if err != nil {
c.logger.Panicf("Failed to get config header type from config block: %s", err)
}
c.configInflight = false
switch common.HeaderType(hdr.Type) {
case common.HeaderType_CONFIG:
configMembership := c.detectConfChange(block)
c.raftMetadataLock.Lock()
c.opts.BlockMetadata.RaftIndex = index
if configMembership != nil {
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
}
c.raftMetadataLock.Unlock()
blockMetadataBytes := protoutil.MarshalOrPanic(c.opts.BlockMetadata)
// write block with metadata
c.support.WriteConfigBlock(block, blockMetadataBytes)
if configMembership == nil {
return
}
// update membership
if configMembership.ConfChange != nil {
// We need to propose conf change in a go routine, because it may be blocked if raft node
// becomes leaderless, and we should not block `run` so it can keep consuming applyC,
// otherwise we have a deadlock.
go func() {
// ProposeConfChange returns error only if node being stopped.
// This proposal is dropped by followers because DisableProposalForwarding is enabled.
if err := c.Node.ProposeConfChange(context.TODO(), *configMembership.ConfChange); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = configMembership.ConfChange
switch configMembership.ConfChange.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", configMembership.ConfChange.NodeID)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", configMembership.ConfChange.NodeID)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
c.configInflight = true
} else {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
case common.HeaderType_ORDERER_TRANSACTION:
c.logger.Panicf("Programming error: unsupported legacy system channel config type: %s", common.HeaderType(hdr.Type))
default:
c.logger.Panicf("Programming error: unexpected config type: %s", common.HeaderType(hdr.Type))
}
}
// getInFlightConfChange returns ConfChange in-flight if any.
// It returns confChangeInProgress if it is not nil. Otherwise
// it returns ConfChange from the last committed block (might be nil).
func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
if c.confChangeInProgress != nil {
return c.confChangeInProgress
}
if c.lastBlock.Header.Number == 0 {
return nil // nothing to failover just started the chain
}
if !protoutil.IsConfigBlock(c.lastBlock) {
return nil
}
// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})
if len(confState.Voters) == len(c.opts.BlockMetadata.ConsenterIds) {
// Raft configuration change could only add one node or
// remove one node at a time, if raft conf state size is
// equal to membership stored in block metadata field,
// that means everything is in sync and no need to propose
// config update.
return nil
}
return ConfChange(c.opts.BlockMetadata, confState)
}
// newMetadata extract config metadata from the configuration block
func (c *Chain) newConfigMetadata(block *common.Block) *etcdraft.ConfigMetadata {
metadata, err := ConsensusMetadataFromConfigBlock(block)
if err != nil {
c.logger.Panicf("error reading consensus metadata: %s", err)
}
return metadata
}
// ValidateConsensusMetadata determines the validity of a
// ConsensusMetadata update during config updates on the channel.
func (c *Chain) ValidateConsensusMetadata(oldOrdererConfig, newOrdererConfig channelconfig.Orderer, newChannel bool) error {
if newOrdererConfig == nil {
c.logger.Panic("Programming Error: ValidateConsensusMetadata called with nil new channel config")
return nil
}
// metadata was not updated
if newOrdererConfig.ConsensusMetadata() == nil {
return nil
}
if oldOrdererConfig == nil {
c.logger.Panic("Programming Error: ValidateConsensusMetadata called with nil old channel config")
return nil
}
if oldOrdererConfig.ConsensusMetadata() == nil {
c.logger.Panic("Programming Error: ValidateConsensusMetadata called with nil old metadata")
return nil
}
oldMetadata := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(oldOrdererConfig.ConsensusMetadata(), oldMetadata); err != nil {
c.logger.Panicf("Programming Error: Failed to unmarshal old etcdraft consensus metadata: %v", err)
}
newMetadata := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(newOrdererConfig.ConsensusMetadata(), newMetadata); err != nil {
return errors.Wrap(err, "failed to unmarshal new etcdraft metadata configuration")
}
verifyOpts, err := createX509VerifyOptions(newOrdererConfig)
if err != nil {
return errors.Wrapf(err, "failed to create x509 verify options from old and new orderer config")
}
if err := VerifyConfigMetadata(newMetadata, verifyOpts); err != nil {
return errors.Wrap(err, "invalid new config metadata")
}
if newChannel {
// check if the consenters are a subset of the existing consenters (system channel consenters)
set := ConsentersToMap(oldMetadata.Consenters)
for _, c := range newMetadata.Consenters {
if !set.Exists(c) {
return errors.New("new channel has consenter that is not part of system consenter set")
}
}
return nil
}
// create the dummy parameters for ComputeMembershipChanges
c.raftMetadataLock.RLock()
dummyOldBlockMetadata := proto.Clone(c.opts.BlockMetadata).(*etcdraft.BlockMetadata)
c.raftMetadataLock.RUnlock()
dummyOldConsentersMap := CreateConsentersMap(dummyOldBlockMetadata, oldMetadata)
changes, err := ComputeMembershipChanges(dummyOldBlockMetadata, dummyOldConsentersMap, newMetadata.Consenters)
if err != nil {
return err
}
// new config metadata was verified above. Additionally need to check new consenters for certificates expiration
for _, c := range changes.AddedNodes {
if err := validateConsenterTLSCerts(c, verifyOpts, false); err != nil {
return errors.Wrapf(err, "consenter %s:%d has invalid certificates", c.Host, c.Port)
}
}
active := c.ActiveNodes.Load().([]uint64)
if changes.UnacceptableQuorumLoss(active) {
return errors.Errorf("%d out of %d nodes are alive, configuration will result in quorum loss", len(active), len(dummyOldConsentersMap))
}
return nil
}
// StatusReport returns the ConsensusRelation & Status
func (c *Chain) StatusReport() (types.ConsensusRelation, types.Status) {
c.statusReportMutex.Lock()
defer c.statusReportMutex.Unlock()
return c.consensusRelation, c.status
}
func (c *Chain) suspectEviction() bool {
if c.isRunning() != nil {
return false
}
return atomic.LoadUint64(&c.lastKnownLeader) == uint64(0)
}
func (c *Chain) newEvictionSuspector() *evictionSuspector {
consenterCertificate := &ConsenterCertificate{
Logger: c.logger,
ConsenterCertificate: c.opts.Cert,
CryptoProvider: c.CryptoProvider,
}
return &evictionSuspector{
amIInChannel: consenterCertificate.IsConsenterOfChannel,
evictionSuspicionThreshold: c.opts.EvictionSuspicion,
writeBlock: c.support.Append,
createPuller: c.createPuller,
height: c.support.Height,
triggerCatchUp: c.triggerCatchup,
logger: c.logger,
halt: func() {
c.halt()
},
}
}
func (c *Chain) triggerCatchup(sn *raftpb.Snapshot) {
select {
case c.snapC <- sn:
case <-c.doneC:
}
}
// checkForEvictionNCertRotation checks for node eviction and
// certificate rotation, return true if request includes it
// otherwise returns false
func (c *Chain) checkForEvictionNCertRotation(env *common.Envelope) bool {
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
c.logger.Warnf("failed to extract payload from config envelope: %s", err)
return false
}
configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
if err != nil {
c.logger.Warnf("could not read config update: %s", err)
return false
}
configMeta, err := MetadataFromConfigUpdate(configUpdate)
if err != nil || configMeta == nil {
c.logger.Warnf("could not read config metadata: %s", err)
return false
}
membershipUpdates, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMeta.Consenters)
if err != nil {
c.logger.Warnf("illegal configuration change detected: %s", err)
return false
}
if membershipUpdates.RotatedNode == c.raftID {
c.logger.Infof("Detected certificate rotation of our node")
return true
}
if _, exists := membershipUpdates.NewConsenters[c.raftID]; !exists {
c.logger.Infof("Detected eviction of ourselves from the configuration")
return true
}
c.logger.Debugf("Node %d is still part of the consenters set", c.raftID)
return false
}