360 lines
9.5 KiB
Go
360 lines
9.5 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package etcdraft
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"code.cloudfoundry.org/clock"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/hyperledger/fabric-protos-go/orderer"
|
|
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
"github.com/pkg/errors"
|
|
"go.etcd.io/etcd/raft/v3"
|
|
"go.etcd.io/etcd/raft/v3/raftpb"
|
|
)
|
|
|
|
var (
|
|
ErrChainHalting = errors.New("chain halting is in progress")
|
|
ErrNoAvailableLeaderCandidate = errors.New("leadership transfer failed to identify transferee")
|
|
ErrTimedOutLeaderTransfer = errors.New("leadership transfer timed out")
|
|
ErrNoLeader = errors.New("no leader")
|
|
)
|
|
|
|
type node struct {
|
|
chainID string
|
|
logger *flogging.FabricLogger
|
|
metrics *Metrics
|
|
|
|
unreachableLock sync.RWMutex
|
|
unreachable map[uint64]struct{}
|
|
|
|
tracker *Tracker
|
|
|
|
storage *RaftStorage
|
|
config *raft.Config
|
|
confState atomic.Value // stores raft ConfState
|
|
|
|
rpc RPC
|
|
|
|
chain *Chain
|
|
|
|
tickInterval time.Duration
|
|
clock clock.Clock
|
|
|
|
metadata *etcdraft.BlockMetadata
|
|
|
|
leaderChangeSubscription atomic.Value
|
|
|
|
raft.Node
|
|
}
|
|
|
|
func (n *node) start(fresh, join bool) {
|
|
raftPeers := RaftPeers(n.metadata.ConsenterIds)
|
|
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))
|
|
|
|
var campaign bool
|
|
if fresh {
|
|
if join {
|
|
n.logger.Info("Starting raft node to join an existing channel")
|
|
n.Node = raft.RestartNode(n.config)
|
|
} else {
|
|
n.logger.Info("Starting raft node as part of a new channel")
|
|
|
|
// determine the node to start campaign by selecting the node with ID equals to:
|
|
// hash(channelID) % cluster_size + 1
|
|
sha := sha256.Sum256([]byte(n.chainID))
|
|
number, _ := proto.DecodeVarint(sha[24:])
|
|
if n.config.ID == number%uint64(len(raftPeers))+1 {
|
|
campaign = true
|
|
}
|
|
n.Node = raft.StartNode(n.config, raftPeers)
|
|
}
|
|
} else {
|
|
n.logger.Info("Restarting raft node")
|
|
n.Node = raft.RestartNode(n.config)
|
|
}
|
|
|
|
go n.run(campaign)
|
|
}
|
|
|
|
func (n *node) run(campaign bool) {
|
|
electionTimeout := n.tickInterval.Seconds() * float64(n.config.ElectionTick)
|
|
halfElectionTimeout := electionTimeout / 2
|
|
|
|
raftTicker := n.clock.NewTicker(n.tickInterval)
|
|
|
|
if s := n.storage.Snapshot(); !raft.IsEmptySnap(s) {
|
|
n.chain.snapC <- &s
|
|
}
|
|
|
|
elected := make(chan struct{})
|
|
if campaign {
|
|
n.logger.Infof("This node is picked to start campaign")
|
|
go func() {
|
|
// Attempt campaign every two HeartbeatTimeout elapses, until leader is present - either this
|
|
// node successfully claims leadership, or another leader already existed when this node starts.
|
|
// We could do this more lazily and exit proactive campaign once transitioned to Candidate state
|
|
// (not PreCandidate because other nodes might not have started yet, in which case PreVote
|
|
// messages are dropped at recipients). But there is no obvious reason (for now) to be lazy.
|
|
//
|
|
// 2*HeartbeatTick is used to avoid excessive campaign when network latency is significant and
|
|
// Raft term keeps advancing in this extreme case.
|
|
campaignTicker := n.clock.NewTicker(n.tickInterval * time.Duration(n.config.HeartbeatTick) * 2)
|
|
defer campaignTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-campaignTicker.C():
|
|
n.Campaign(context.TODO())
|
|
case <-elected:
|
|
return
|
|
case <-n.chain.doneC:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-raftTicker.C():
|
|
// grab raft Status before ticking it, so `RecentActive` attributes
|
|
// are not reset yet.
|
|
status := n.Status()
|
|
|
|
n.Tick()
|
|
n.tracker.Check(&status)
|
|
|
|
case rd := <-n.Ready():
|
|
startStoring := n.clock.Now()
|
|
if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
|
|
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
|
|
}
|
|
duration := n.clock.Since(startStoring).Seconds()
|
|
n.metrics.DataPersistDuration.Observe(float64(duration))
|
|
if duration > halfElectionTimeout {
|
|
n.logger.Warningf("WAL sync took %v seconds and the network is configured to start elections after %v seconds. Your disk is too slow and may cause loss of quorum and trigger leadership election.", duration, electionTimeout)
|
|
}
|
|
|
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
|
n.chain.snapC <- &rd.Snapshot
|
|
}
|
|
|
|
// skip empty apply
|
|
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
|
|
n.maybeSyncWAL(rd.CommittedEntries)
|
|
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
|
|
}
|
|
|
|
if campaign && rd.SoftState != nil {
|
|
leader := atomic.LoadUint64(&rd.SoftState.Lead) // etcdraft requires atomic access to this var
|
|
if leader != raft.None {
|
|
n.logger.Infof("Leader %d is present, quit campaign", leader)
|
|
campaign = false
|
|
close(elected)
|
|
}
|
|
}
|
|
|
|
n.Advance()
|
|
|
|
// TODO(jay_guo) leader can write to disk in parallel with replicating
|
|
// to the followers and them writing to their disks. Check 10.2.1 in thesis
|
|
n.send(rd.Messages)
|
|
|
|
case <-n.chain.haltC:
|
|
raftTicker.Stop()
|
|
n.Stop()
|
|
n.storage.Close()
|
|
n.logger.Infof("Raft node stopped")
|
|
close(n.chain.doneC) // close after all the artifacts are closed
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *node) send(msgs []raftpb.Message) {
|
|
n.unreachableLock.RLock()
|
|
defer n.unreachableLock.RUnlock()
|
|
|
|
for _, msg := range msgs {
|
|
if msg.To == 0 {
|
|
continue
|
|
}
|
|
|
|
status := raft.SnapshotFinish
|
|
|
|
// Replace node list in snapshot with CURRENT node list in cluster.
|
|
if msg.Type == raftpb.MsgSnap {
|
|
state := n.confState.Load()
|
|
if state != nil {
|
|
msg.Snapshot.Metadata.ConfState = *state.(*raftpb.ConfState)
|
|
}
|
|
}
|
|
|
|
msgBytes := protoutil.MarshalOrPanic(&msg)
|
|
err := n.rpc.SendConsensus(msg.To, &orderer.ConsensusRequest{Channel: n.chainID, Payload: msgBytes})
|
|
if err != nil {
|
|
n.ReportUnreachable(msg.To)
|
|
n.logSendFailure(msg.To, err)
|
|
|
|
status = raft.SnapshotFailure
|
|
} else if _, ok := n.unreachable[msg.To]; ok {
|
|
n.logger.Infof("Successfully sent StepRequest to %d after failed attempt(s)", msg.To)
|
|
delete(n.unreachable, msg.To)
|
|
}
|
|
|
|
if msg.Type == raftpb.MsgSnap {
|
|
n.ReportSnapshot(msg.To, status)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *node) maybeSyncWAL(entries []raftpb.Entry) {
|
|
allNormal := true
|
|
for _, entry := range entries {
|
|
if entry.Type == raftpb.EntryNormal {
|
|
continue
|
|
}
|
|
allNormal = false
|
|
}
|
|
|
|
if allNormal {
|
|
return
|
|
}
|
|
|
|
if err := n.storage.Sync(); err != nil {
|
|
n.logger.Errorf("Failed to sync raft log, error: %s", err)
|
|
}
|
|
}
|
|
|
|
// abdicateLeadership picks a node that is recently active, and attempts to transfer leadership to it.
|
|
// Blocks until leadership transfer happens or when a timeout expires.
|
|
// Returns error upon failure.
|
|
func (n *node) abdicateLeadership() error {
|
|
start := time.Now()
|
|
defer func() {
|
|
n.logger.Infof("abdicateLeader took %v", time.Since(start))
|
|
}()
|
|
|
|
status := n.Status()
|
|
|
|
if status.Lead == raft.None {
|
|
n.logger.Warn("No leader, cannot transfer leadership")
|
|
return ErrNoLeader
|
|
}
|
|
|
|
if status.Lead != n.config.ID {
|
|
n.logger.Warn("Leader has changed since asked to transfer leadership")
|
|
return nil
|
|
}
|
|
|
|
// register to leader changes
|
|
notifyC, unsubscribe := n.subscribeToLeaderChange()
|
|
defer unsubscribe()
|
|
|
|
var transferee uint64
|
|
for id, pr := range status.Progress {
|
|
if id == status.ID {
|
|
continue // skip self
|
|
}
|
|
|
|
if pr.RecentActive && !pr.IsPaused() {
|
|
transferee = id
|
|
break
|
|
}
|
|
|
|
n.logger.Debugf("Node %d is not qualified as transferee because it's either paused or not active", id)
|
|
}
|
|
|
|
if transferee == raft.None {
|
|
n.logger.Errorf("No follower is qualified as transferee, abort leader transfer")
|
|
return ErrNoAvailableLeaderCandidate
|
|
}
|
|
|
|
n.logger.Infof("Transferring leadership to %d", transferee)
|
|
|
|
timeToWait := time.Duration(n.config.ElectionTick) * n.tickInterval
|
|
n.logger.Infof("Will wait %v time to abdicate", timeToWait)
|
|
ctx, cancel := context.WithTimeout(context.TODO(), timeToWait)
|
|
defer cancel()
|
|
|
|
n.TransferLeadership(ctx, status.ID, transferee)
|
|
|
|
timer := n.clock.NewTimer(timeToWait)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-timer.C():
|
|
n.logger.Warn("Leader transfer timed out")
|
|
return ErrTimedOutLeaderTransfer
|
|
case l := <-notifyC:
|
|
n.logger.Infof("Leader has been transferred from %d to %d", n.config.ID, l)
|
|
return nil
|
|
case <-n.chain.doneC:
|
|
n.logger.Infof("Returning early because chain is halting")
|
|
return ErrChainHalting
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *node) subscribeToLeaderChange() (chan uint64, func()) {
|
|
notifyC := make(chan uint64, 1)
|
|
subscriptionActive := uint32(1)
|
|
unsubscribe := func() {
|
|
atomic.StoreUint32(&subscriptionActive, 0)
|
|
}
|
|
subscription := func(leader uint64) {
|
|
if atomic.LoadUint32(&subscriptionActive) == 0 {
|
|
return
|
|
}
|
|
if leader != n.config.ID {
|
|
select {
|
|
case notifyC <- leader:
|
|
default:
|
|
// In case notifyC is full
|
|
}
|
|
}
|
|
}
|
|
n.leaderChangeSubscription.Store(subscription)
|
|
return notifyC, unsubscribe
|
|
}
|
|
|
|
func (n *node) logSendFailure(dest uint64, err error) {
|
|
if _, ok := n.unreachable[dest]; ok {
|
|
n.logger.Debugf("Failed to send StepRequest to %d, because: %s", dest, err)
|
|
return
|
|
}
|
|
|
|
n.logger.Errorf("Failed to send StepRequest to %d, because: %s", dest, err)
|
|
n.unreachable[dest] = struct{}{}
|
|
}
|
|
|
|
func (n *node) takeSnapshot(index uint64, cs raftpb.ConfState, data []byte) {
|
|
if err := n.storage.TakeSnapshot(index, cs, data); err != nil {
|
|
n.logger.Errorf("Failed to create snapshot at index %d: %s", index, err)
|
|
}
|
|
}
|
|
|
|
func (n *node) lastIndex() uint64 {
|
|
i, _ := n.storage.ram.LastIndex()
|
|
return i
|
|
}
|
|
|
|
func (n *node) ApplyConfChange(cc raftpb.ConfChange) *raftpb.ConfState {
|
|
state := n.Node.ApplyConfChange(cc)
|
|
n.confState.Store(state)
|
|
return state
|
|
}
|