179 lines
4.1 KiB
Go
179 lines
4.1 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package election
|
|
|
|
import (
|
|
"bytes"
|
|
"sync"
|
|
"time"
|
|
|
|
proto "github.com/hyperledger/fabric-protos-go/gossip"
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
"github.com/hyperledger/fabric/gossip/discovery"
|
|
"github.com/hyperledger/fabric/gossip/metrics"
|
|
"github.com/hyperledger/fabric/gossip/protoext"
|
|
"github.com/hyperledger/fabric/gossip/util"
|
|
)
|
|
|
|
type msgImpl struct {
|
|
msg *proto.GossipMessage
|
|
}
|
|
|
|
func (mi *msgImpl) SenderID() peerID {
|
|
return mi.msg.GetLeadershipMsg().PkiId
|
|
}
|
|
|
|
func (mi *msgImpl) IsProposal() bool {
|
|
return !mi.IsDeclaration()
|
|
}
|
|
|
|
func (mi *msgImpl) IsDeclaration() bool {
|
|
return mi.msg.GetLeadershipMsg().IsDeclaration
|
|
}
|
|
|
|
type peerImpl struct {
|
|
member discovery.NetworkMember
|
|
}
|
|
|
|
func (pi *peerImpl) ID() peerID {
|
|
return peerID(pi.member.PKIid)
|
|
}
|
|
|
|
type gossip interface {
|
|
// PeersOfChannel returns the NetworkMembers considered alive in a channel
|
|
PeersOfChannel(channel common.ChannelID) []discovery.NetworkMember
|
|
|
|
// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
|
|
// If passThrough is false, the messages are processed by the gossip layer beforehand.
|
|
// If passThrough is true, the gossip layer doesn't intervene and the messages
|
|
// can be used to send a reply back to the sender
|
|
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan protoext.ReceivedMessage)
|
|
|
|
// Gossip sends a message to other peers to the network
|
|
Gossip(msg *proto.GossipMessage)
|
|
|
|
// IsInMyOrg checks whether a network member is in this peer's org
|
|
IsInMyOrg(member discovery.NetworkMember) bool
|
|
}
|
|
|
|
type adapterImpl struct {
|
|
gossip gossip
|
|
selfPKIid common.PKIidType
|
|
|
|
incTime uint64
|
|
seqNum uint64
|
|
|
|
channel common.ChannelID
|
|
|
|
logger util.Logger
|
|
|
|
doneCh chan struct{}
|
|
stopOnce *sync.Once
|
|
metrics *metrics.ElectionMetrics
|
|
}
|
|
|
|
// NewAdapter creates new leader election adapter
|
|
func NewAdapter(gossip gossip, pkiid common.PKIidType, channel common.ChannelID,
|
|
metrics *metrics.ElectionMetrics) LeaderElectionAdapter {
|
|
return &adapterImpl{
|
|
gossip: gossip,
|
|
selfPKIid: pkiid,
|
|
|
|
incTime: uint64(time.Now().UnixNano()),
|
|
seqNum: uint64(0),
|
|
|
|
channel: channel,
|
|
|
|
logger: util.GetLogger(util.ElectionLogger, ""),
|
|
|
|
doneCh: make(chan struct{}),
|
|
stopOnce: &sync.Once{},
|
|
metrics: metrics,
|
|
}
|
|
}
|
|
|
|
func (ai *adapterImpl) Gossip(msg Msg) {
|
|
ai.gossip.Gossip(msg.(*msgImpl).msg)
|
|
}
|
|
|
|
func (ai *adapterImpl) Accept() <-chan Msg {
|
|
adapterCh, _ := ai.gossip.Accept(func(message interface{}) bool {
|
|
// Get only leadership org and channel messages
|
|
return message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG &&
|
|
protoext.IsLeadershipMsg(message.(*proto.GossipMessage)) &&
|
|
bytes.Equal(message.(*proto.GossipMessage).Channel, ai.channel)
|
|
}, false)
|
|
|
|
msgCh := make(chan Msg)
|
|
|
|
go func(inCh <-chan *proto.GossipMessage, outCh chan Msg, stopCh chan struct{}) {
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case gossipMsg, ok := <-inCh:
|
|
if ok {
|
|
outCh <- &msgImpl{gossipMsg}
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}(adapterCh, msgCh, ai.doneCh)
|
|
return msgCh
|
|
}
|
|
|
|
func (ai *adapterImpl) CreateMessage(isDeclaration bool) Msg {
|
|
ai.seqNum++
|
|
seqNum := ai.seqNum
|
|
|
|
leadershipMsg := &proto.LeadershipMessage{
|
|
PkiId: ai.selfPKIid,
|
|
IsDeclaration: isDeclaration,
|
|
Timestamp: &proto.PeerTime{
|
|
IncNum: ai.incTime,
|
|
SeqNum: seqNum,
|
|
},
|
|
}
|
|
|
|
msg := &proto.GossipMessage{
|
|
Nonce: 0,
|
|
Tag: proto.GossipMessage_CHAN_AND_ORG,
|
|
Content: &proto.GossipMessage_LeadershipMsg{LeadershipMsg: leadershipMsg},
|
|
Channel: ai.channel,
|
|
}
|
|
return &msgImpl{msg}
|
|
}
|
|
|
|
func (ai *adapterImpl) Peers() []Peer {
|
|
peers := ai.gossip.PeersOfChannel(ai.channel)
|
|
|
|
var res []Peer
|
|
for _, peer := range peers {
|
|
if ai.gossip.IsInMyOrg(peer) {
|
|
res = append(res, &peerImpl{peer})
|
|
}
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (ai *adapterImpl) ReportMetrics(isLeader bool) {
|
|
var leadershipBit float64
|
|
if isLeader {
|
|
leadershipBit = 1
|
|
}
|
|
ai.metrics.Declaration.With("channel", string(ai.channel)).Set(leadershipBit)
|
|
}
|
|
|
|
func (ai *adapterImpl) Stop() {
|
|
stopFunc := func() {
|
|
close(ai.doneCh)
|
|
}
|
|
ai.stopOnce.Do(stopFunc)
|
|
}
|