go_study/fabric-main/gossip/election/adapter_test.go

356 lines
8.6 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package election
import (
"bytes"
"fmt"
"strings"
"sync"
"testing"
"time"
proto "github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/metrics/mocks"
"github.com/hyperledger/fabric/gossip/protoext"
"github.com/hyperledger/fabric/gossip/util"
"github.com/stretchr/testify/require"
)
func init() {
util.SetupTestLogging()
}
func TestNewAdapter(t *testing.T) {
selfNetworkMember := &discovery.NetworkMember{
Endpoint: "p0",
Metadata: []byte{},
PKIid: []byte{byte(0)},
}
mockGossip := newGossip("peer0", selfNetworkMember, nil)
peersCluster := newClusterOfPeers("0")
peersCluster.addPeer("peer0", mockGossip)
NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"),
metrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
}
func TestAdapterImpl_CreateMessage(t *testing.T) {
selfNetworkMember := &discovery.NetworkMember{
Endpoint: "p0",
Metadata: []byte{},
PKIid: []byte{byte(0)},
}
mockGossip := newGossip("peer0", selfNetworkMember, nil)
adapter := NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"),
metrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
msg := adapter.CreateMessage(true)
if !protoext.IsLeadershipMsg(msg.(*msgImpl).msg) {
t.Error("Newly created message should be LeadershipMsg")
}
if !msg.IsDeclaration() {
t.Error("Newly created msg should be Declaration msg")
}
msg = adapter.CreateMessage(false)
if !protoext.IsLeadershipMsg(msg.(*msgImpl).msg) {
t.Error("Newly created message should be LeadershipMsg")
}
if !msg.IsProposal() || msg.IsDeclaration() {
t.Error("Newly created msg should be Proposal msg")
}
}
func TestAdapterImpl_Peers(t *testing.T) {
peersOrgA := map[string]struct{}{
"Peer0": {},
"Peer1": {},
"Peer2": {},
"Peer3": {},
"Peer4": {},
"Peer5": {},
}
peersOrgB := map[string]struct{}{
"Peer6": {},
"Peer7": {},
"Peer8": {},
}
pki2org := make(map[string]string)
for id := range peersOrgA {
pki2org[id] = "A"
}
for id := range peersOrgB {
pki2org[id] = "B"
}
_, adapters := createCluster(pki2org, 0, 1, 2, 3, 4, 5, 6, 7, 8)
for id, adapter := range adapters {
var myPeersOrg map[string]struct{}
if pki2org[id] == "A" {
myPeersOrg = peersOrgA
} else {
myPeersOrg = peersOrgB
}
peers := adapter.Peers()
if len(peers) != len(myPeersOrg) {
t.Errorf("Should return %d peers, not %d", len(myPeersOrg), len(peers))
}
for _, peer := range peers {
if _, exist := myPeersOrg[peer.(*peerImpl).member.Endpoint]; !exist {
t.Errorf("Peer %s PKID not found", peer.(*peerImpl).member.Endpoint)
}
}
}
}
func TestAdapterImpl_Stop(t *testing.T) {
_, adapters := createCluster(nil, 0, 1, 2, 3, 4, 5)
for _, adapter := range adapters {
adapter.Accept()
}
for _, adapter := range adapters {
adapter.Stop()
}
}
func TestAdapterImpl_Gossip(t *testing.T) {
_, adapters := createCluster(nil, 0, 1, 2)
channels := make(map[string]<-chan Msg)
for peerID, adapter := range adapters {
channels[peerID] = adapter.Accept()
}
sender := adapters[fmt.Sprintf("Peer%d", 0)]
sender.Gossip(sender.CreateMessage(true))
totalMsg := 0
timer := time.After(time.Duration(1) * time.Second)
for {
select {
case <-timer:
if totalMsg != 2 {
t.Error("Not all messages accepted")
t.FailNow()
} else {
return
}
case msg := <-channels[fmt.Sprintf("Peer%d", 1)]:
if !msg.IsDeclaration() {
t.Error("Msg should be declaration")
} else if !bytes.Equal(msg.SenderID(), sender.selfPKIid) {
t.Error("Msg Sender is wrong")
} else {
totalMsg++
}
case msg := <-channels[fmt.Sprintf("Peer%d", 2)]:
if !msg.IsDeclaration() {
t.Error("Msg should be declaration")
} else if !bytes.Equal(msg.SenderID(), sender.selfPKIid) {
t.Error("Msg Sender is wrong")
} else {
totalMsg++
}
}
}
}
type mockAcceptor struct {
ch chan *proto.GossipMessage
acceptor common.MessageAcceptor
}
type peerMockGossip struct {
cluster *clusterOfPeers
member *discovery.NetworkMember
acceptors []*mockAcceptor
acceptorLock *sync.RWMutex
clusterLock *sync.RWMutex
id string
pki2org map[string]string
}
func (g *peerMockGossip) PeersOfChannel(channel common.ChannelID) []discovery.NetworkMember {
g.clusterLock.RLock()
if g.cluster == nil {
g.clusterLock.RUnlock()
return []discovery.NetworkMember{*g.member}
}
peerLock := g.cluster.peersLock
g.clusterLock.RUnlock()
peerLock.RLock()
res := make([]discovery.NetworkMember, 0)
g.clusterLock.RLock()
for _, val := range g.cluster.peersGossip {
res = append(res, *val.member)
}
g.clusterLock.RUnlock()
peerLock.RUnlock()
return res
}
func (g *peerMockGossip) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan protoext.ReceivedMessage) {
ch := make(chan *proto.GossipMessage, 100)
g.acceptorLock.Lock()
g.acceptors = append(g.acceptors, &mockAcceptor{
ch: ch,
acceptor: acceptor,
})
g.acceptorLock.Unlock()
return ch, nil
}
func (g *peerMockGossip) Gossip(msg *proto.GossipMessage) {
g.clusterLock.RLock()
if g.cluster == nil {
g.clusterLock.RUnlock()
return
}
peersLock := g.cluster.peersLock
g.clusterLock.RUnlock()
peersLock.RLock()
g.clusterLock.RLock()
for _, val := range g.cluster.peersGossip {
if strings.Compare(val.id, g.id) != 0 {
val.putToAcceptors(msg)
}
}
g.clusterLock.RUnlock()
peersLock.RUnlock()
}
func (g *peerMockGossip) putToAcceptors(msg *proto.GossipMessage) {
g.acceptorLock.RLock()
for _, acceptor := range g.acceptors {
if acceptor.acceptor(msg) {
if len(acceptor.ch) < 10 {
acceptor.ch <- msg
}
}
}
g.acceptorLock.RUnlock()
}
func (g *peerMockGossip) IsInMyOrg(member discovery.NetworkMember) bool {
var myOrg, memberOrg string
var exists bool
if myOrg, exists = g.pki2org[g.id]; !exists {
return false
}
if memberOrg, exists = g.pki2org[member.Endpoint]; !exists {
return false
}
return myOrg == memberOrg
}
func newGossip(peerID string, member *discovery.NetworkMember, pki2org map[string]string) *peerMockGossip {
return &peerMockGossip{
id: peerID,
member: member,
acceptorLock: &sync.RWMutex{},
clusterLock: &sync.RWMutex{},
acceptors: make([]*mockAcceptor, 0),
pki2org: pki2org,
}
}
type clusterOfPeers struct {
peersGossip map[string]*peerMockGossip
peersLock *sync.RWMutex
id string
}
func (cop *clusterOfPeers) addPeer(peerID string, gossip *peerMockGossip) {
cop.peersLock.Lock()
cop.peersGossip[peerID] = gossip
gossip.clusterLock.Lock()
gossip.cluster = cop
gossip.clusterLock.Unlock()
cop.peersLock.Unlock()
}
func newClusterOfPeers(id string) *clusterOfPeers {
return &clusterOfPeers{
id: id,
peersGossip: make(map[string]*peerMockGossip),
peersLock: &sync.RWMutex{},
}
}
func createCluster(pki2org map[string]string, peers ...int) (*clusterOfPeers, map[string]*adapterImpl) {
adapters := make(map[string]*adapterImpl)
cluster := newClusterOfPeers("0")
for _, peer := range peers {
peerEndpoint := fmt.Sprintf("Peer%d", peer)
peerPKID := []byte{byte(peer)}
peerMember := &discovery.NetworkMember{
Metadata: []byte{},
Endpoint: peerEndpoint,
PKIid: peerPKID,
}
mockGossip := newGossip(peerEndpoint, peerMember, pki2org)
adapter := NewAdapter(mockGossip, peerMember.PKIid, []byte("channel0"),
metrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
adapters[peerEndpoint] = adapter.(*adapterImpl)
cluster.addPeer(peerEndpoint, mockGossip)
}
return cluster, adapters
}
func TestReportMetrics(t *testing.T) {
testMetricProvider := mocks.TestUtilConstructMetricProvider()
electionMetrics := metrics.NewGossipMetrics(testMetricProvider.FakeProvider).ElectionMetrics
mockGossip := newGossip("", &discovery.NetworkMember{}, nil)
adapter := NewAdapter(mockGossip, nil, []byte("channel0"), electionMetrics)
adapter.ReportMetrics(true)
require.Equal(t,
[]string{"channel", "channel0"},
testMetricProvider.FakeDeclarationGauge.WithArgsForCall(0),
)
require.EqualValues(t,
1,
testMetricProvider.FakeDeclarationGauge.SetArgsForCall(0),
)
adapter.ReportMetrics(false)
require.Equal(t,
[]string{"channel", "channel0"},
testMetricProvider.FakeDeclarationGauge.WithArgsForCall(1),
)
require.EqualValues(t,
0,
testMetricProvider.FakeDeclarationGauge.SetArgsForCall(1),
)
}