go_study/fabric-main/orderer/consensus/etcdraft/node.go

360 lines
9.5 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"context"
"crypto/sha256"
"sync"
"sync/atomic"
"time"
"code.cloudfoundry.org/clock"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
)
var (
ErrChainHalting = errors.New("chain halting is in progress")
ErrNoAvailableLeaderCandidate = errors.New("leadership transfer failed to identify transferee")
ErrTimedOutLeaderTransfer = errors.New("leadership transfer timed out")
ErrNoLeader = errors.New("no leader")
)
type node struct {
chainID string
logger *flogging.FabricLogger
metrics *Metrics
unreachableLock sync.RWMutex
unreachable map[uint64]struct{}
tracker *Tracker
storage *RaftStorage
config *raft.Config
confState atomic.Value // stores raft ConfState
rpc RPC
chain *Chain
tickInterval time.Duration
clock clock.Clock
metadata *etcdraft.BlockMetadata
leaderChangeSubscription atomic.Value
raft.Node
}
func (n *node) start(fresh, join bool) {
raftPeers := RaftPeers(n.metadata.ConsenterIds)
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))
var campaign bool
if fresh {
if join {
n.logger.Info("Starting raft node to join an existing channel")
n.Node = raft.RestartNode(n.config)
} else {
n.logger.Info("Starting raft node as part of a new channel")
// determine the node to start campaign by selecting the node with ID equals to:
// hash(channelID) % cluster_size + 1
sha := sha256.Sum256([]byte(n.chainID))
number, _ := proto.DecodeVarint(sha[24:])
if n.config.ID == number%uint64(len(raftPeers))+1 {
campaign = true
}
n.Node = raft.StartNode(n.config, raftPeers)
}
} else {
n.logger.Info("Restarting raft node")
n.Node = raft.RestartNode(n.config)
}
go n.run(campaign)
}
func (n *node) run(campaign bool) {
electionTimeout := n.tickInterval.Seconds() * float64(n.config.ElectionTick)
halfElectionTimeout := electionTimeout / 2
raftTicker := n.clock.NewTicker(n.tickInterval)
if s := n.storage.Snapshot(); !raft.IsEmptySnap(s) {
n.chain.snapC <- &s
}
elected := make(chan struct{})
if campaign {
n.logger.Infof("This node is picked to start campaign")
go func() {
// Attempt campaign every two HeartbeatTimeout elapses, until leader is present - either this
// node successfully claims leadership, or another leader already existed when this node starts.
// We could do this more lazily and exit proactive campaign once transitioned to Candidate state
// (not PreCandidate because other nodes might not have started yet, in which case PreVote
// messages are dropped at recipients). But there is no obvious reason (for now) to be lazy.
//
// 2*HeartbeatTick is used to avoid excessive campaign when network latency is significant and
// Raft term keeps advancing in this extreme case.
campaignTicker := n.clock.NewTicker(n.tickInterval * time.Duration(n.config.HeartbeatTick) * 2)
defer campaignTicker.Stop()
for {
select {
case <-campaignTicker.C():
n.Campaign(context.TODO())
case <-elected:
return
case <-n.chain.doneC:
return
}
}
}()
}
for {
select {
case <-raftTicker.C():
// grab raft Status before ticking it, so `RecentActive` attributes
// are not reset yet.
status := n.Status()
n.Tick()
n.tracker.Check(&status)
case rd := <-n.Ready():
startStoring := n.clock.Now()
if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}
duration := n.clock.Since(startStoring).Seconds()
n.metrics.DataPersistDuration.Observe(float64(duration))
if duration > halfElectionTimeout {
n.logger.Warningf("WAL sync took %v seconds and the network is configured to start elections after %v seconds. Your disk is too slow and may cause loss of quorum and trigger leadership election.", duration, electionTimeout)
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
}
// skip empty apply
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
n.maybeSyncWAL(rd.CommittedEntries)
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
}
if campaign && rd.SoftState != nil {
leader := atomic.LoadUint64(&rd.SoftState.Lead) // etcdraft requires atomic access to this var
if leader != raft.None {
n.logger.Infof("Leader %d is present, quit campaign", leader)
campaign = false
close(elected)
}
}
n.Advance()
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
n.send(rd.Messages)
case <-n.chain.haltC:
raftTicker.Stop()
n.Stop()
n.storage.Close()
n.logger.Infof("Raft node stopped")
close(n.chain.doneC) // close after all the artifacts are closed
return
}
}
}
func (n *node) send(msgs []raftpb.Message) {
n.unreachableLock.RLock()
defer n.unreachableLock.RUnlock()
for _, msg := range msgs {
if msg.To == 0 {
continue
}
status := raft.SnapshotFinish
// Replace node list in snapshot with CURRENT node list in cluster.
if msg.Type == raftpb.MsgSnap {
state := n.confState.Load()
if state != nil {
msg.Snapshot.Metadata.ConfState = *state.(*raftpb.ConfState)
}
}
msgBytes := protoutil.MarshalOrPanic(&msg)
err := n.rpc.SendConsensus(msg.To, &orderer.ConsensusRequest{Channel: n.chainID, Payload: msgBytes})
if err != nil {
n.ReportUnreachable(msg.To)
n.logSendFailure(msg.To, err)
status = raft.SnapshotFailure
} else if _, ok := n.unreachable[msg.To]; ok {
n.logger.Infof("Successfully sent StepRequest to %d after failed attempt(s)", msg.To)
delete(n.unreachable, msg.To)
}
if msg.Type == raftpb.MsgSnap {
n.ReportSnapshot(msg.To, status)
}
}
}
func (n *node) maybeSyncWAL(entries []raftpb.Entry) {
allNormal := true
for _, entry := range entries {
if entry.Type == raftpb.EntryNormal {
continue
}
allNormal = false
}
if allNormal {
return
}
if err := n.storage.Sync(); err != nil {
n.logger.Errorf("Failed to sync raft log, error: %s", err)
}
}
// abdicateLeadership picks a node that is recently active, and attempts to transfer leadership to it.
// Blocks until leadership transfer happens or when a timeout expires.
// Returns error upon failure.
func (n *node) abdicateLeadership() error {
start := time.Now()
defer func() {
n.logger.Infof("abdicateLeader took %v", time.Since(start))
}()
status := n.Status()
if status.Lead == raft.None {
n.logger.Warn("No leader, cannot transfer leadership")
return ErrNoLeader
}
if status.Lead != n.config.ID {
n.logger.Warn("Leader has changed since asked to transfer leadership")
return nil
}
// register to leader changes
notifyC, unsubscribe := n.subscribeToLeaderChange()
defer unsubscribe()
var transferee uint64
for id, pr := range status.Progress {
if id == status.ID {
continue // skip self
}
if pr.RecentActive && !pr.IsPaused() {
transferee = id
break
}
n.logger.Debugf("Node %d is not qualified as transferee because it's either paused or not active", id)
}
if transferee == raft.None {
n.logger.Errorf("No follower is qualified as transferee, abort leader transfer")
return ErrNoAvailableLeaderCandidate
}
n.logger.Infof("Transferring leadership to %d", transferee)
timeToWait := time.Duration(n.config.ElectionTick) * n.tickInterval
n.logger.Infof("Will wait %v time to abdicate", timeToWait)
ctx, cancel := context.WithTimeout(context.TODO(), timeToWait)
defer cancel()
n.TransferLeadership(ctx, status.ID, transferee)
timer := n.clock.NewTimer(timeToWait)
defer timer.Stop()
for {
select {
case <-timer.C():
n.logger.Warn("Leader transfer timed out")
return ErrTimedOutLeaderTransfer
case l := <-notifyC:
n.logger.Infof("Leader has been transferred from %d to %d", n.config.ID, l)
return nil
case <-n.chain.doneC:
n.logger.Infof("Returning early because chain is halting")
return ErrChainHalting
}
}
}
func (n *node) subscribeToLeaderChange() (chan uint64, func()) {
notifyC := make(chan uint64, 1)
subscriptionActive := uint32(1)
unsubscribe := func() {
atomic.StoreUint32(&subscriptionActive, 0)
}
subscription := func(leader uint64) {
if atomic.LoadUint32(&subscriptionActive) == 0 {
return
}
if leader != n.config.ID {
select {
case notifyC <- leader:
default:
// In case notifyC is full
}
}
}
n.leaderChangeSubscription.Store(subscription)
return notifyC, unsubscribe
}
func (n *node) logSendFailure(dest uint64, err error) {
if _, ok := n.unreachable[dest]; ok {
n.logger.Debugf("Failed to send StepRequest to %d, because: %s", dest, err)
return
}
n.logger.Errorf("Failed to send StepRequest to %d, because: %s", dest, err)
n.unreachable[dest] = struct{}{}
}
func (n *node) takeSnapshot(index uint64, cs raftpb.ConfState, data []byte) {
if err := n.storage.TakeSnapshot(index, cs, data); err != nil {
n.logger.Errorf("Failed to create snapshot at index %d: %s", index, err)
}
}
func (n *node) lastIndex() uint64 {
i, _ := n.storage.ram.LastIndex()
return i
}
func (n *node) ApplyConfChange(cc raftpb.ConfChange) *raftpb.ConfState {
state := n.Node.ApplyConfChange(cc)
n.confState.Store(state)
return state
}