1929 lines
61 KiB
Go
1929 lines
61 KiB
Go
/*
|
|
Copyright IBM Corp. 2016 All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package discovery
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
protoG "github.com/golang/protobuf/proto"
|
|
proto "github.com/hyperledger/fabric-protos-go/gossip"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
|
|
"github.com/hyperledger/fabric/gossip/protoext"
|
|
"github.com/hyperledger/fabric/gossip/util"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
)
|
|
|
|
var timeout = time.Second * time.Duration(15)
|
|
|
|
var (
|
|
aliveTimeInterval = time.Duration(time.Millisecond * 300)
|
|
defaultTestConfig = DiscoveryConfig{
|
|
AliveTimeInterval: aliveTimeInterval,
|
|
AliveExpirationTimeout: 10 * aliveTimeInterval,
|
|
AliveExpirationCheckInterval: aliveTimeInterval,
|
|
ReconnectInterval: 10 * aliveTimeInterval,
|
|
MaxConnectionAttempts: DefMaxConnectionAttempts,
|
|
MsgExpirationFactor: DefMsgExpirationFactor,
|
|
}
|
|
)
|
|
|
|
func init() {
|
|
util.SetupTestLogging()
|
|
defaultTestConfig.MaxConnectionAttempts = 10000
|
|
}
|
|
|
|
type dummyReceivedMessage struct {
|
|
msg *protoext.SignedGossipMessage
|
|
info *protoext.ConnectionInfo
|
|
}
|
|
|
|
func (*dummyReceivedMessage) Respond(msg *proto.GossipMessage) {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (rm *dummyReceivedMessage) GetGossipMessage() *protoext.SignedGossipMessage {
|
|
return rm.msg
|
|
}
|
|
|
|
func (*dummyReceivedMessage) GetSourceEnvelope() *proto.Envelope {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (rm *dummyReceivedMessage) GetConnectionInfo() *protoext.ConnectionInfo {
|
|
return rm.info
|
|
}
|
|
|
|
func (*dummyReceivedMessage) Ack(err error) {
|
|
panic("implement me")
|
|
}
|
|
|
|
// mockAnchorPeerTracker implements AnchorPeerTracker interface
|
|
type mockAnchorPeerTracker struct {
|
|
apEndpoints []string
|
|
}
|
|
|
|
func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool {
|
|
return util.Contains(endpoint, m.apEndpoints)
|
|
}
|
|
|
|
type dummyCommModule struct {
|
|
validatedMessages chan *protoext.SignedGossipMessage
|
|
msgsReceived uint32
|
|
msgsSent uint32
|
|
id string
|
|
identitySwitch chan common.PKIidType
|
|
presumeDead chan common.PKIidType
|
|
detectedDead chan string
|
|
streams map[string]proto.Gossip_GossipStreamClient
|
|
conns map[string]*grpc.ClientConn
|
|
lock *sync.RWMutex
|
|
incMsgs chan protoext.ReceivedMessage
|
|
lastSeqs map[string]uint64
|
|
shouldGossip bool
|
|
disableComm bool
|
|
mock *mock.Mock
|
|
signCount uint32
|
|
}
|
|
|
|
type gossipInstance struct {
|
|
msgInterceptor func(*protoext.SignedGossipMessage)
|
|
comm *dummyCommModule
|
|
Discovery
|
|
gRGCserv *grpc.Server
|
|
lsnr net.Listener
|
|
shouldGossip bool
|
|
syncInitiator *time.Ticker
|
|
stopChan chan struct{}
|
|
port int
|
|
}
|
|
|
|
func (comm *dummyCommModule) ValidateAliveMsg(am *protoext.SignedGossipMessage) bool {
|
|
comm.lock.RLock()
|
|
c := comm.validatedMessages
|
|
comm.lock.RUnlock()
|
|
|
|
if c != nil {
|
|
c <- am
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (comm *dummyCommModule) IdentitySwitch() <-chan common.PKIidType {
|
|
return comm.identitySwitch
|
|
}
|
|
|
|
func (comm *dummyCommModule) recordValidation(validatedMessages chan *protoext.SignedGossipMessage) {
|
|
comm.lock.Lock()
|
|
defer comm.lock.Unlock()
|
|
comm.validatedMessages = validatedMessages
|
|
}
|
|
|
|
func (comm *dummyCommModule) SignMessage(am *proto.GossipMessage, internalEndpoint string) *proto.Envelope {
|
|
atomic.AddUint32(&comm.signCount, 1)
|
|
protoext.NoopSign(am)
|
|
|
|
secret := &proto.Secret{
|
|
Content: &proto.Secret_InternalEndpoint{
|
|
InternalEndpoint: internalEndpoint,
|
|
},
|
|
}
|
|
signer := func(msg []byte) ([]byte, error) {
|
|
return nil, nil
|
|
}
|
|
s, _ := protoext.NoopSign(am)
|
|
env := s.Envelope
|
|
protoext.SignSecret(env, signer, secret)
|
|
return env
|
|
}
|
|
|
|
func (comm *dummyCommModule) Gossip(msg *protoext.SignedGossipMessage) {
|
|
if !comm.shouldGossip || comm.disableComm {
|
|
return
|
|
}
|
|
comm.lock.Lock()
|
|
defer comm.lock.Unlock()
|
|
for _, conn := range comm.streams {
|
|
conn.Send(msg.Envelope)
|
|
}
|
|
}
|
|
|
|
func (comm *dummyCommModule) Forward(msg protoext.ReceivedMessage) {
|
|
if !comm.shouldGossip || comm.disableComm {
|
|
return
|
|
}
|
|
comm.lock.Lock()
|
|
defer comm.lock.Unlock()
|
|
for _, conn := range comm.streams {
|
|
conn.Send(msg.GetGossipMessage().Envelope)
|
|
}
|
|
}
|
|
|
|
func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *protoext.SignedGossipMessage) {
|
|
if comm.disableComm {
|
|
return
|
|
}
|
|
comm.lock.RLock()
|
|
_, exists := comm.streams[peer.Endpoint]
|
|
mock := comm.mock
|
|
comm.lock.RUnlock()
|
|
|
|
if mock != nil {
|
|
mock.Called(peer, msg)
|
|
}
|
|
|
|
if !exists {
|
|
if comm.Ping(peer) == false {
|
|
fmt.Printf("Ping to %v failed\n", peer.Endpoint)
|
|
return
|
|
}
|
|
}
|
|
comm.lock.Lock()
|
|
s, _ := protoext.NoopSign(msg.GossipMessage)
|
|
comm.streams[peer.Endpoint].Send(s.Envelope)
|
|
comm.lock.Unlock()
|
|
atomic.AddUint32(&comm.msgsSent, 1)
|
|
}
|
|
|
|
func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
|
|
if comm.disableComm {
|
|
return false
|
|
}
|
|
comm.lock.Lock()
|
|
defer comm.lock.Unlock()
|
|
|
|
if comm.mock != nil {
|
|
comm.mock.Called()
|
|
}
|
|
|
|
_, alreadyExists := comm.streams[peer.Endpoint]
|
|
conn := comm.conns[peer.Endpoint]
|
|
if !alreadyExists || conn.GetState() == connectivity.Shutdown {
|
|
newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure())
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if stream, err := proto.NewGossipClient(newConn).GossipStream(context.Background()); err == nil {
|
|
comm.conns[peer.Endpoint] = newConn
|
|
comm.streams[peer.Endpoint] = stream
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
if _, err := proto.NewGossipClient(conn).Ping(context.Background(), &proto.Empty{}); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (comm *dummyCommModule) Accept() <-chan protoext.ReceivedMessage {
|
|
return comm.incMsgs
|
|
}
|
|
|
|
func (comm *dummyCommModule) PresumedDead() <-chan common.PKIidType {
|
|
return comm.presumeDead
|
|
}
|
|
|
|
func (comm *dummyCommModule) CloseConn(peer *NetworkMember) {
|
|
comm.lock.Lock()
|
|
defer comm.lock.Unlock()
|
|
|
|
if _, exists := comm.streams[peer.Endpoint]; !exists {
|
|
return
|
|
}
|
|
|
|
comm.streams[peer.Endpoint].CloseSend()
|
|
comm.conns[peer.Endpoint].Close()
|
|
}
|
|
|
|
func (g *gossipInstance) receivedMsgCount() int {
|
|
return int(atomic.LoadUint32(&g.comm.msgsReceived))
|
|
}
|
|
|
|
func (g *gossipInstance) sentMsgCount() int {
|
|
return int(atomic.LoadUint32(&g.comm.msgsSent))
|
|
}
|
|
|
|
func (g *gossipInstance) discoveryImpl() *gossipDiscoveryImpl {
|
|
return g.Discovery.(*gossipDiscoveryImpl)
|
|
}
|
|
|
|
func (g *gossipInstance) initiateSync(frequency time.Duration, peerNum int) {
|
|
g.syncInitiator = time.NewTicker(frequency)
|
|
g.stopChan = make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-g.syncInitiator.C:
|
|
g.Discovery.InitiateSync(peerNum)
|
|
case <-g.stopChan:
|
|
g.syncInitiator.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) error {
|
|
for {
|
|
envelope, err := stream.Recv()
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lgr := g.Discovery.(*gossipDiscoveryImpl).logger
|
|
gMsg, err := protoext.EnvelopeToGossipMessage(envelope)
|
|
if err != nil {
|
|
lgr.Warning("Failed deserializing GossipMessage from envelope:", err)
|
|
continue
|
|
}
|
|
g.msgInterceptor(gMsg)
|
|
|
|
lgr.Debug(g.Discovery.Self().Endpoint, "Got message:", gMsg)
|
|
g.comm.incMsgs <- &dummyReceivedMessage{
|
|
msg: gMsg,
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType("testID"),
|
|
},
|
|
}
|
|
atomic.AddUint32(&g.comm.msgsReceived, 1)
|
|
|
|
if aliveMsg := gMsg.GetAliveMsg(); aliveMsg != nil {
|
|
g.tryForwardMessage(gMsg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *gossipInstance) tryForwardMessage(msg *protoext.SignedGossipMessage) {
|
|
g.comm.lock.Lock()
|
|
|
|
aliveMsg := msg.GetAliveMsg()
|
|
|
|
forward := false
|
|
id := string(aliveMsg.Membership.PkiId)
|
|
seqNum := aliveMsg.Timestamp.SeqNum
|
|
if last, exists := g.comm.lastSeqs[id]; exists {
|
|
if last < seqNum {
|
|
g.comm.lastSeqs[id] = seqNum
|
|
forward = true
|
|
}
|
|
} else {
|
|
g.comm.lastSeqs[id] = seqNum
|
|
forward = true
|
|
}
|
|
|
|
g.comm.lock.Unlock()
|
|
|
|
if forward {
|
|
g.comm.Gossip(msg)
|
|
}
|
|
}
|
|
|
|
func (g *gossipInstance) Stop() {
|
|
if g.syncInitiator != nil {
|
|
g.stopChan <- struct{}{}
|
|
}
|
|
g.gRGCserv.Stop()
|
|
g.lsnr.Close()
|
|
g.comm.lock.Lock()
|
|
for _, stream := range g.comm.streams {
|
|
stream.CloseSend()
|
|
}
|
|
g.comm.lock.Unlock()
|
|
for _, conn := range g.comm.conns {
|
|
conn.Close()
|
|
}
|
|
g.Discovery.Stop()
|
|
}
|
|
|
|
func (g *gossipInstance) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
var noopPolicy = func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) {
|
|
return func(msg *protoext.SignedGossipMessage) bool {
|
|
return true
|
|
}, func(message *protoext.SignedGossipMessage) *proto.Envelope {
|
|
return message.Envelope
|
|
}
|
|
}
|
|
|
|
func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance {
|
|
return createDiscoveryInstanceCustomConfig(port, id, bootstrapPeers, defaultTestConfig)
|
|
}
|
|
|
|
func createDiscoveryInstanceCustomConfig(port int, id string, bootstrapPeers []string, config DiscoveryConfig) *gossipInstance {
|
|
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true, noopPolicy, config)
|
|
}
|
|
|
|
func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance {
|
|
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, noopPolicy, defaultTestConfig)
|
|
}
|
|
|
|
func createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(port int, id string, bootstrapPeers []string, pol DisclosurePolicy) *gossipInstance {
|
|
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, pol, defaultTestConfig)
|
|
}
|
|
|
|
func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, config DiscoveryConfig) *gossipInstance {
|
|
return createDiscoveryInstanceThatGossipsWithInterceptors(port, id, bootstrapPeers, shouldGossip, pol, func(_ *protoext.SignedGossipMessage) {}, config)
|
|
}
|
|
|
|
func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*protoext.SignedGossipMessage), config DiscoveryConfig) *gossipInstance {
|
|
mockTracker := &mockAnchorPeerTracker{}
|
|
return createDiscoveryInstanceWithAnchorPeerTracker(port, id, bootstrapPeers, shouldGossip, pol, f, config, mockTracker, nil)
|
|
}
|
|
|
|
func createDiscoveryInstanceWithAnchorPeerTracker(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy,
|
|
f func(*protoext.SignedGossipMessage), config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker, logger util.Logger) *gossipInstance {
|
|
comm := &dummyCommModule{
|
|
conns: make(map[string]*grpc.ClientConn),
|
|
streams: make(map[string]proto.Gossip_GossipStreamClient),
|
|
incMsgs: make(chan protoext.ReceivedMessage, 1000),
|
|
presumeDead: make(chan common.PKIidType, 10000),
|
|
id: id,
|
|
detectedDead: make(chan string, 10000),
|
|
identitySwitch: make(chan common.PKIidType),
|
|
lock: &sync.RWMutex{},
|
|
lastSeqs: make(map[string]uint64),
|
|
shouldGossip: shouldGossip,
|
|
disableComm: false,
|
|
}
|
|
|
|
endpoint := fmt.Sprintf("localhost:%d", port)
|
|
self := NetworkMember{
|
|
Metadata: []byte{},
|
|
PKIid: []byte(endpoint),
|
|
Endpoint: endpoint,
|
|
InternalEndpoint: endpoint,
|
|
}
|
|
|
|
listenAddress := fmt.Sprintf("%s:%d", "", port)
|
|
ll, err := net.Listen("tcp", listenAddress)
|
|
if err != nil {
|
|
errMsg := fmt.Sprintf("Failed creating listener on address %v for gossip instance: %v", listenAddress, err)
|
|
panic(errMsg)
|
|
}
|
|
s := grpc.NewServer()
|
|
|
|
config.BootstrapPeers = bootstrapPeers
|
|
|
|
if logger == nil {
|
|
logger = util.GetLogger(util.DiscoveryLogger, self.InternalEndpoint)
|
|
}
|
|
discSvc := NewDiscoveryService(self, comm, comm, pol, config, anchorPeerTracker, logger)
|
|
for _, bootPeer := range bootstrapPeers {
|
|
bp := bootPeer
|
|
discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) {
|
|
return &PeerIdentification{SelfOrg: true, ID: common.PKIidType(bp)}, nil
|
|
})
|
|
}
|
|
|
|
gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip, port: port, msgInterceptor: f}
|
|
|
|
proto.RegisterGossipServer(s, gossInst)
|
|
go s.Serve(ll)
|
|
|
|
return gossInst
|
|
}
|
|
|
|
func bootPeer(port int) string {
|
|
return fmt.Sprintf("localhost:%d", port)
|
|
}
|
|
|
|
func TestClone(t *testing.T) {
|
|
nm := &NetworkMember{
|
|
PKIid: common.PKIidType("abc"),
|
|
Properties: &proto.Properties{
|
|
LedgerHeight: 1,
|
|
LeftChannel: true,
|
|
},
|
|
Envelope: &proto.Envelope{
|
|
Payload: []byte("payload"),
|
|
},
|
|
InternalEndpoint: "internal",
|
|
Metadata: []byte{1, 2, 3},
|
|
Endpoint: "endpoint",
|
|
}
|
|
|
|
nm2 := nm.Clone()
|
|
require.Equal(t, *nm, nm2, "Clones are different")
|
|
require.False(t, nm.Properties == nm2.Properties, "Cloning should be deep and not shallow")
|
|
require.False(t, nm.Envelope == nm2.Envelope, "Cloning should be deep and not shallow")
|
|
}
|
|
|
|
func TestHasExternalEndpoints(t *testing.T) {
|
|
memberWithEndpoint := NetworkMember{Endpoint: "foo"}
|
|
memberWithoutEndpoint := NetworkMember{}
|
|
|
|
require.True(t, HasExternalEndpoint(memberWithEndpoint))
|
|
require.False(t, HasExternalEndpoint(memberWithoutEndpoint))
|
|
}
|
|
|
|
func TestToString(t *testing.T) {
|
|
nm := NetworkMember{
|
|
Endpoint: "a",
|
|
InternalEndpoint: "b",
|
|
}
|
|
require.Equal(t, "b", nm.PreferredEndpoint())
|
|
nm = NetworkMember{
|
|
Endpoint: "a",
|
|
}
|
|
require.Equal(t, "a", nm.PreferredEndpoint())
|
|
|
|
now := time.Now()
|
|
ts := ×tamp{
|
|
incTime: now,
|
|
seqNum: uint64(42),
|
|
}
|
|
require.Equal(t, fmt.Sprintf("%d, %d", now.UnixNano(), 42), fmt.Sprint(ts))
|
|
}
|
|
|
|
func TestNetworkMemberString(t *testing.T) {
|
|
tests := []struct {
|
|
input NetworkMember
|
|
expected string
|
|
}{
|
|
{
|
|
input: NetworkMember{Endpoint: "endpoint", InternalEndpoint: "internal-endpoint", PKIid: common.PKIidType{0, 1, 2, 3}, Metadata: nil},
|
|
expected: "Endpoint: endpoint, InternalEndpoint: internal-endpoint, PKI-ID: 00010203, Metadata: ",
|
|
},
|
|
{
|
|
input: NetworkMember{Endpoint: "endpoint", InternalEndpoint: "internal-endpoint", PKIid: common.PKIidType{0, 1, 2, 3}, Metadata: []byte{4, 5, 6, 7}},
|
|
expected: "Endpoint: endpoint, InternalEndpoint: internal-endpoint, PKI-ID: 00010203, Metadata: 04050607",
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
require.Equal(t, tt.expected, tt.input.String())
|
|
}
|
|
}
|
|
|
|
func TestBadInput(t *testing.T) {
|
|
inst := createDiscoveryInstance(2048, fmt.Sprintf("d%d", 0), []string{})
|
|
inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(nil)
|
|
s, _ := protoext.NoopSign(&proto.GossipMessage{
|
|
Content: &proto.GossipMessage_DataMsg{
|
|
DataMsg: &proto.DataMessage{},
|
|
},
|
|
})
|
|
inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(&dummyReceivedMessage{
|
|
msg: s,
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType("testID"),
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestConnect(t *testing.T) {
|
|
nodeNum := 10
|
|
instances := []*gossipInstance{}
|
|
firstSentMemReqMsgs := make(chan *protoext.SignedGossipMessage, nodeNum)
|
|
for i := 0; i < nodeNum; i++ {
|
|
inst := createDiscoveryInstance(7611+i, fmt.Sprintf("d%d", i), []string{})
|
|
|
|
inst.comm.lock.Lock()
|
|
inst.comm.mock = &mock.Mock{}
|
|
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
|
|
inst := inst
|
|
msg := arguments.Get(1).(*protoext.SignedGossipMessage)
|
|
if req := msg.GetMemReq(); req != nil {
|
|
selfMsg, _ := protoext.EnvelopeToGossipMessage(req.SelfInformation)
|
|
firstSentMemReqMsgs <- selfMsg
|
|
inst.comm.lock.Lock()
|
|
inst.comm.mock = nil
|
|
inst.comm.lock.Unlock()
|
|
}
|
|
})
|
|
inst.comm.mock.On("Ping", mock.Anything)
|
|
inst.comm.lock.Unlock()
|
|
instances = append(instances, inst)
|
|
j := (i + 1) % 10
|
|
endpoint := fmt.Sprintf("localhost:%d", 7611+j)
|
|
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
|
|
inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) {
|
|
return &PeerIdentification{SelfOrg: false, ID: nil}, nil
|
|
})
|
|
}
|
|
|
|
time.Sleep(time.Second * 3)
|
|
fullMembership := func() bool {
|
|
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
|
|
}
|
|
waitUntilOrFail(t, fullMembership)
|
|
|
|
discInst := instances[rand.Intn(len(instances))].Discovery.(*gossipDiscoveryImpl)
|
|
mr, _ := discInst.createMembershipRequest(true)
|
|
am, _ := protoext.EnvelopeToGossipMessage(mr.GetMemReq().SelfInformation)
|
|
require.NotNil(t, am.SecretEnvelope)
|
|
mr2, _ := discInst.createMembershipRequest(false)
|
|
am, _ = protoext.EnvelopeToGossipMessage(mr2.GetMemReq().SelfInformation)
|
|
require.Nil(t, am.SecretEnvelope)
|
|
stopInstances(t, instances)
|
|
require.Len(t, firstSentMemReqMsgs, 10)
|
|
close(firstSentMemReqMsgs)
|
|
for firstSentSelfMsg := range firstSentMemReqMsgs {
|
|
require.Nil(t, firstSentSelfMsg.Envelope.SecretEnvelope)
|
|
}
|
|
}
|
|
|
|
func TestNoSigningIfNoMembership(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
inst := createDiscoveryInstance(8931, "foreveralone", nil)
|
|
defer inst.Stop()
|
|
time.Sleep(defaultTestConfig.AliveTimeInterval * 10)
|
|
assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount))
|
|
|
|
inst.InitiateSync(10000)
|
|
assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount))
|
|
}
|
|
|
|
func TestValidation(t *testing.T) {
|
|
// Scenarios: This test contains the following sub-tests:
|
|
// 1) alive message validation: a message is validated <==> it entered the message store
|
|
// 2) request/response message validation:
|
|
// 2.1) alive messages from membership requests/responses are validated.
|
|
// 2.2) once alive messages enter the message store, reception of them via membership responses
|
|
// doesn't trigger validation, but via membership requests - do.
|
|
|
|
wrapReceivedMessage := func(msg *protoext.SignedGossipMessage) protoext.ReceivedMessage {
|
|
return &dummyReceivedMessage{
|
|
msg: msg,
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType("testID"),
|
|
},
|
|
}
|
|
}
|
|
|
|
requestMessagesReceived := make(chan *protoext.SignedGossipMessage, 100)
|
|
responseMessagesReceived := make(chan *protoext.SignedGossipMessage, 100)
|
|
aliveMessagesReceived := make(chan *protoext.SignedGossipMessage, 5000)
|
|
|
|
var membershipRequest atomic.Value
|
|
var membershipResponseWithAlivePeers atomic.Value
|
|
var membershipResponseWithDeadPeers atomic.Value
|
|
|
|
recordMembershipRequest := func(req *protoext.SignedGossipMessage) {
|
|
msg, _ := protoext.EnvelopeToGossipMessage(req.GetMemReq().SelfInformation)
|
|
membershipRequest.Store(req)
|
|
requestMessagesReceived <- msg
|
|
}
|
|
|
|
recordMembershipResponse := func(res *protoext.SignedGossipMessage) {
|
|
memRes := res.GetMemRes()
|
|
if len(memRes.GetAlive()) > 0 {
|
|
membershipResponseWithAlivePeers.Store(res)
|
|
}
|
|
if len(memRes.GetDead()) > 0 {
|
|
membershipResponseWithDeadPeers.Store(res)
|
|
}
|
|
responseMessagesReceived <- res
|
|
}
|
|
|
|
interceptor := func(msg *protoext.SignedGossipMessage) {
|
|
if memReq := msg.GetMemReq(); memReq != nil {
|
|
recordMembershipRequest(msg)
|
|
return
|
|
}
|
|
|
|
if memRes := msg.GetMemRes(); memRes != nil {
|
|
recordMembershipResponse(msg)
|
|
return
|
|
}
|
|
// Else, it's an alive message
|
|
aliveMessagesReceived <- msg
|
|
}
|
|
|
|
// p3 is the boot peer of p1, and p1 is the boot peer of p2.
|
|
// p1 sends a (membership) request to p3, and receives a (membership) response back.
|
|
// p2 sends a (membership) request to p1.
|
|
// Therefore, p1 receives both a membership request and a response.
|
|
p1 := createDiscoveryInstanceThatGossipsWithInterceptors(4675, "p1", []string{bootPeer(4677)}, true, noopPolicy, interceptor, defaultTestConfig)
|
|
p2 := createDiscoveryInstance(4676, "p2", []string{bootPeer(4675)})
|
|
p3 := createDiscoveryInstance(4677, "p3", nil)
|
|
instances := []*gossipInstance{p1, p2, p3}
|
|
|
|
assertMembership(t, instances, 2)
|
|
|
|
instances = []*gossipInstance{p1, p2}
|
|
// Stop p3 and wait until its death is detected
|
|
p3.Stop()
|
|
assertMembership(t, instances, 1)
|
|
// Force p1 to send a membership request so it can receive back a response
|
|
// with dead peers.
|
|
p1.InitiateSync(1)
|
|
|
|
// Wait until a response with a dead peer is received
|
|
waitUntilOrFail(t, func() bool {
|
|
return membershipResponseWithDeadPeers.Load() != nil
|
|
})
|
|
|
|
p1.Stop()
|
|
p2.Stop()
|
|
|
|
close(aliveMessagesReceived)
|
|
t.Log("Recorded", len(aliveMessagesReceived), "alive messages")
|
|
t.Log("Recorded", len(requestMessagesReceived), "request messages")
|
|
t.Log("Recorded", len(responseMessagesReceived), "response messages")
|
|
|
|
// Ensure we got alive messages from membership requests and from membership responses
|
|
require.NotNil(t, membershipResponseWithAlivePeers.Load())
|
|
require.NotNil(t, membershipRequest.Load())
|
|
|
|
t.Run("alive message", func(t *testing.T) {
|
|
// Spawn a new peer - p4
|
|
p4 := createDiscoveryInstance(4678, "p1", nil)
|
|
defer p4.Stop()
|
|
// Record messages validated
|
|
validatedMessages := make(chan *protoext.SignedGossipMessage, 5000)
|
|
p4.comm.recordValidation(validatedMessages)
|
|
tmpMsgs := make(chan *protoext.SignedGossipMessage, 5000)
|
|
// Replay the messages sent to p1 into p4, and also save them into a temporary channel
|
|
for msg := range aliveMessagesReceived {
|
|
p4.comm.incMsgs <- wrapReceivedMessage(msg)
|
|
tmpMsgs <- msg
|
|
}
|
|
|
|
// Simulate the messages received by p4 into the message store
|
|
policy := protoext.NewGossipMessageComparator(0)
|
|
msgStore := msgstore.NewMessageStore(policy, func(_ interface{}) {})
|
|
close(tmpMsgs)
|
|
for msg := range tmpMsgs {
|
|
if msgStore.Add(msg) {
|
|
// Ensure the message was verified if it can be added into the message store
|
|
expectedMessage := <-validatedMessages
|
|
require.Equal(t, expectedMessage, msg)
|
|
}
|
|
}
|
|
// Ensure we didn't validate any other messages.
|
|
require.Empty(t, validatedMessages)
|
|
})
|
|
|
|
req := membershipRequest.Load().(*protoext.SignedGossipMessage)
|
|
res := membershipResponseWithDeadPeers.Load().(*protoext.SignedGossipMessage)
|
|
// Ensure the membership response contains both alive and dead peers
|
|
require.Len(t, res.GetMemRes().GetAlive(), 2)
|
|
require.Len(t, res.GetMemRes().GetDead(), 1)
|
|
|
|
for _, testCase := range []struct {
|
|
name string
|
|
expectedAliveMessages int
|
|
port int
|
|
message *protoext.SignedGossipMessage
|
|
shouldBeReValidated bool
|
|
}{
|
|
{
|
|
name: "membership request",
|
|
expectedAliveMessages: 1,
|
|
message: req,
|
|
port: 4679,
|
|
shouldBeReValidated: true,
|
|
},
|
|
{
|
|
name: "membership response",
|
|
expectedAliveMessages: 3,
|
|
message: res,
|
|
port: 4680,
|
|
},
|
|
} {
|
|
testCase := testCase
|
|
t.Run(testCase.name, func(t *testing.T) {
|
|
p := createDiscoveryInstance(testCase.port, "p", nil)
|
|
defer p.Stop()
|
|
// Record messages validated
|
|
validatedMessages := make(chan *protoext.SignedGossipMessage, testCase.expectedAliveMessages)
|
|
p.comm.recordValidation(validatedMessages)
|
|
|
|
p.comm.incMsgs <- wrapReceivedMessage(testCase.message)
|
|
// Ensure all messages were validated
|
|
for i := 0; i < testCase.expectedAliveMessages; i++ {
|
|
validatedMsg := <-validatedMessages
|
|
// send the message directly to be included in the message store
|
|
p.comm.incMsgs <- wrapReceivedMessage(validatedMsg)
|
|
}
|
|
// Wait for the messages to be validated
|
|
for i := 0; i < testCase.expectedAliveMessages; i++ {
|
|
<-validatedMessages
|
|
}
|
|
// Not more than testCase.expectedAliveMessages should have been validated
|
|
require.Empty(t, validatedMessages)
|
|
|
|
if !testCase.shouldBeReValidated {
|
|
// Re-submit the message twice and ensure it wasn't validated.
|
|
// If it is validated, panic would occur because an enqueue to the validatesMessages channel
|
|
// would be attempted and the channel is closed.
|
|
close(validatedMessages)
|
|
}
|
|
p.comm.incMsgs <- wrapReceivedMessage(testCase.message)
|
|
p.comm.incMsgs <- wrapReceivedMessage(testCase.message)
|
|
// Wait until the size of the channel is zero. It means at least one message was processed.
|
|
waitUntilOrFail(t, func() bool {
|
|
return len(p.comm.incMsgs) == 0
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpdate(t *testing.T) {
|
|
nodeNum := 5
|
|
bootPeers := []string{bootPeer(6611), bootPeer(6612)}
|
|
instances := []*gossipInstance{}
|
|
|
|
inst := createDiscoveryInstance(6611, "d1", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
inst = createDiscoveryInstance(6612, "d2", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
for i := 3; i <= nodeNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstance(6610+i, id, bootPeers)
|
|
instances = append(instances, inst)
|
|
}
|
|
|
|
fullMembership := func() bool {
|
|
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
|
|
}
|
|
|
|
waitUntilOrFail(t, fullMembership)
|
|
|
|
instances[0].UpdateMetadata([]byte("bla bla"))
|
|
instances[nodeNum-1].UpdateEndpoint("localhost:5511")
|
|
|
|
checkMembership := func() bool {
|
|
for _, member := range instances[nodeNum-1].GetMembership() {
|
|
if string(member.PKIid) == instances[0].comm.id {
|
|
if string(member.Metadata) != "bla bla" {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, member := range instances[0].GetMembership() {
|
|
if string(member.PKIid) == instances[nodeNum-1].comm.id {
|
|
if member.Endpoint != "localhost:5511" {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
waitUntilOrFail(t, checkMembership)
|
|
stopInstances(t, instances)
|
|
}
|
|
|
|
func TestInitiateSync(t *testing.T) {
|
|
nodeNum := 10
|
|
bootPeers := []string{bootPeer(3611), bootPeer(3612)}
|
|
instances := []*gossipInstance{}
|
|
|
|
toDie := int32(0)
|
|
for i := 1; i <= nodeNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst := createDiscoveryInstanceWithNoGossip(3610+i, id, bootPeers)
|
|
instances = append(instances, inst)
|
|
go func() {
|
|
for {
|
|
if atomic.LoadInt32(&toDie) == int32(1) {
|
|
return
|
|
}
|
|
time.Sleep(defaultTestConfig.AliveExpirationTimeout / 3)
|
|
inst.InitiateSync(9)
|
|
}
|
|
}()
|
|
}
|
|
time.Sleep(defaultTestConfig.AliveExpirationTimeout * 4)
|
|
assertMembership(t, instances, nodeNum-1)
|
|
atomic.StoreInt32(&toDie, int32(1))
|
|
stopInstances(t, instances)
|
|
}
|
|
|
|
func TestSelf(t *testing.T) {
|
|
inst := createDiscoveryInstance(13463, "d1", []string{})
|
|
defer inst.Stop()
|
|
env := inst.Self().Envelope
|
|
sMsg, err := protoext.EnvelopeToGossipMessage(env)
|
|
require.NoError(t, err)
|
|
member := sMsg.GetAliveMsg().Membership
|
|
require.Equal(t, "localhost:13463", member.Endpoint)
|
|
require.Equal(t, []byte("localhost:13463"), member.PkiId)
|
|
|
|
require.Equal(t, "localhost:13463", inst.Self().Endpoint)
|
|
require.Equal(t, common.PKIidType("localhost:13463"), inst.Self().PKIid)
|
|
}
|
|
|
|
func TestExpiration(t *testing.T) {
|
|
nodeNum := 5
|
|
bootPeers := []string{bootPeer(2611), bootPeer(2612)}
|
|
instances := []*gossipInstance{}
|
|
|
|
inst := createDiscoveryInstance(2611, "d1", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
inst = createDiscoveryInstance(2612, "d2", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
for i := 3; i <= nodeNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstance(2610+i, id, bootPeers)
|
|
instances = append(instances, inst)
|
|
}
|
|
|
|
assertMembership(t, instances, nodeNum-1)
|
|
|
|
waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
|
|
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)
|
|
|
|
assertMembership(t, instances[:len(instances)-2], nodeNum-3)
|
|
|
|
stopAction := &sync.WaitGroup{}
|
|
for i, inst := range instances {
|
|
if i+2 == nodeNum {
|
|
break
|
|
}
|
|
stopAction.Add(1)
|
|
go func(inst *gossipInstance) {
|
|
defer stopAction.Done()
|
|
inst.Stop()
|
|
}(inst)
|
|
}
|
|
|
|
waitUntilOrFailBlocking(t, stopAction.Wait)
|
|
}
|
|
|
|
func TestGetFullMembership(t *testing.T) {
|
|
nodeNum := 15
|
|
bootPeers := []string{bootPeer(5511), bootPeer(5512)}
|
|
instances := []*gossipInstance{}
|
|
var inst *gossipInstance
|
|
|
|
for i := 3; i <= nodeNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstance(5510+i, id, bootPeers)
|
|
instances = append(instances, inst)
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
inst = createDiscoveryInstance(5511, "d1", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
inst = createDiscoveryInstance(5512, "d2", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
assertMembership(t, instances, nodeNum-1)
|
|
|
|
// Ensure that internal endpoint was propagated to everyone
|
|
for _, inst := range instances {
|
|
for _, member := range inst.GetMembership() {
|
|
require.NotEmpty(t, member.InternalEndpoint)
|
|
require.NotEmpty(t, member.Endpoint)
|
|
}
|
|
}
|
|
|
|
// Check that Lookup() is valid
|
|
for _, inst := range instances {
|
|
for _, member := range inst.GetMembership() {
|
|
require.Equal(t, string(member.PKIid), inst.Lookup(member.PKIid).Endpoint)
|
|
require.Equal(t, member.PKIid, inst.Lookup(member.PKIid).PKIid)
|
|
}
|
|
}
|
|
|
|
stopInstances(t, instances)
|
|
}
|
|
|
|
func TestGossipDiscoveryStopping(t *testing.T) {
|
|
inst := createDiscoveryInstance(9611, "d1", []string{bootPeer(9611)})
|
|
time.Sleep(time.Second)
|
|
waitUntilOrFailBlocking(t, inst.Stop)
|
|
}
|
|
|
|
func TestGossipDiscoverySkipConnectingToLocalhostBootstrap(t *testing.T) {
|
|
inst := createDiscoveryInstance(11611, "d1", []string{"localhost:11611", "127.0.0.1:11611"})
|
|
inst.comm.lock.Lock()
|
|
inst.comm.mock = &mock.Mock{}
|
|
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(mock.Arguments) {
|
|
t.Fatal("Should not have connected to any peer")
|
|
})
|
|
inst.comm.mock.On("Ping", mock.Anything).Run(func(mock.Arguments) {
|
|
t.Fatal("Should not have connected to any peer")
|
|
})
|
|
inst.comm.lock.Unlock()
|
|
time.Sleep(time.Second * 3)
|
|
waitUntilOrFailBlocking(t, inst.Stop)
|
|
}
|
|
|
|
func TestConvergence(t *testing.T) {
|
|
// scenario:
|
|
// {boot peer: [peer list]}
|
|
// {d1: d2, d3, d4}
|
|
// {d5: d6, d7, d8}
|
|
// {d9: d10, d11, d12}
|
|
// connect all boot peers with d13
|
|
// take down d13
|
|
// ensure still full membership
|
|
instances := []*gossipInstance{}
|
|
for _, i := range []int{1, 5, 9} {
|
|
bootPort := 4610 + i
|
|
id := fmt.Sprintf("d%d", i)
|
|
leader := createDiscoveryInstance(bootPort, id, []string{})
|
|
instances = append(instances, leader)
|
|
for minionIndex := 1; minionIndex <= 3; minionIndex++ {
|
|
id := fmt.Sprintf("d%d", i+minionIndex)
|
|
minion := createDiscoveryInstance(4610+minionIndex+i, id, []string{bootPeer(bootPort)})
|
|
instances = append(instances, minion)
|
|
}
|
|
}
|
|
|
|
assertMembership(t, instances, 3)
|
|
connector := createDiscoveryInstance(4623, "d13", []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)})
|
|
instances = append(instances, connector)
|
|
assertMembership(t, instances, 12)
|
|
connector.Stop()
|
|
instances = instances[:len(instances)-1]
|
|
assertMembership(t, instances, 11)
|
|
stopInstances(t, instances)
|
|
}
|
|
|
|
func TestDisclosurePolicyWithPull(t *testing.T) {
|
|
// Scenario: run 2 groups of peers that simulate 2 organizations:
|
|
// {p0, p1, p2, p3, p4}
|
|
// {p5, p6, p7, p8, p9}
|
|
// Only peers that have an even id have external addresses
|
|
// and only these peers should be published to peers of the other group,
|
|
// while the only ones that need to know about them are peers
|
|
// that have an even id themselves.
|
|
// Furthermore, peers in different sets, should not know about internal addresses of
|
|
// other peers.
|
|
|
|
// This is a bootstrap map that matches for each peer its own bootstrap peer.
|
|
// In practice (production) peers should only use peers of their orgs as bootstrap peers,
|
|
// but the discovery layer is ignorant of organizations.
|
|
bootPeerMap := map[int]int{
|
|
8610: 8616,
|
|
8611: 8610,
|
|
8612: 8610,
|
|
8613: 8610,
|
|
8614: 8610,
|
|
8615: 8616,
|
|
8616: 8610,
|
|
8617: 8616,
|
|
8618: 8616,
|
|
8619: 8616,
|
|
}
|
|
|
|
// This map matches each peer, the peers it should know about in the test scenario.
|
|
peersThatShouldBeKnownToPeers := map[int][]int{
|
|
8610: {8611, 8612, 8613, 8614, 8616, 8618},
|
|
8611: {8610, 8612, 8613, 8614},
|
|
8612: {8610, 8611, 8613, 8614, 8616, 8618},
|
|
8613: {8610, 8611, 8612, 8614},
|
|
8614: {8610, 8611, 8612, 8613, 8616, 8618},
|
|
8615: {8616, 8617, 8618, 8619},
|
|
8616: {8610, 8612, 8614, 8615, 8617, 8618, 8619},
|
|
8617: {8615, 8616, 8618, 8619},
|
|
8618: {8610, 8612, 8614, 8615, 8616, 8617, 8619},
|
|
8619: {8615, 8616, 8617, 8618},
|
|
}
|
|
// Create the peers in the two groups
|
|
instances1, instances2 := createDisjointPeerGroupsWithNoGossip(bootPeerMap)
|
|
// Sleep a while to let them establish membership. This time should be more than enough
|
|
// because the instances are configured to pull membership in very high frequency from
|
|
// up to 10 peers (which results in - pulling from everyone)
|
|
waitUntilOrFail(t, func() bool {
|
|
for _, inst := range append(instances1, instances2...) {
|
|
// Ensure the expected membership is equal in size to the actual membership
|
|
// of each peer.
|
|
portsOfKnownMembers := portsOfMembers(inst.GetMembership())
|
|
if len(peersThatShouldBeKnownToPeers[inst.port]) != len(portsOfKnownMembers) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
for _, inst := range append(instances1, instances2...) {
|
|
portsOfKnownMembers := portsOfMembers(inst.GetMembership())
|
|
// Ensure the expected membership is equal to the actual membership
|
|
// of each peer. the portsOfMembers returns a sorted slice so assert.Equal does the job.
|
|
require.Equal(t, peersThatShouldBeKnownToPeers[inst.port], portsOfKnownMembers)
|
|
// Next, check that internal endpoints aren't leaked across groups,
|
|
for _, knownPeer := range inst.GetMembership() {
|
|
// If internal endpoint is known, ensure the peers are in the same group
|
|
// unless the peer in question is a peer that has a public address.
|
|
// We cannot control what we disclose about ourselves when we send a membership request
|
|
if len(knownPeer.InternalEndpoint) > 0 && inst.port%2 != 0 {
|
|
bothInGroup1 := portOfEndpoint(knownPeer.Endpoint) < 8615 && inst.port < 8615
|
|
bothInGroup2 := portOfEndpoint(knownPeer.Endpoint) >= 8615 && inst.port >= 8615
|
|
require.True(t, bothInGroup1 || bothInGroup2, "%v knows about %v's internal endpoint", inst.port, knownPeer.InternalEndpoint)
|
|
}
|
|
}
|
|
}
|
|
|
|
t.Log("Shutting down instance 0...")
|
|
// Now, we shutdown instance 0 and ensure that peers that shouldn't know it,
|
|
// do not know it via membership requests
|
|
stopInstances(t, []*gossipInstance{instances1[0]})
|
|
time.Sleep(time.Second * 6)
|
|
for _, inst := range append(instances1[1:], instances2...) {
|
|
if peersThatShouldBeKnownToPeers[inst.port][0] == 8610 {
|
|
require.Equal(t, 1, inst.Discovery.(*gossipDiscoveryImpl).deadMembership.Size())
|
|
} else {
|
|
require.Equal(t, 0, inst.Discovery.(*gossipDiscoveryImpl).deadMembership.Size())
|
|
}
|
|
}
|
|
stopInstances(t, instances1[1:])
|
|
stopInstances(t, instances2)
|
|
}
|
|
|
|
func createDisjointPeerGroupsWithNoGossip(bootPeerMap map[int]int) ([]*gossipInstance, []*gossipInstance) {
|
|
instances1 := []*gossipInstance{}
|
|
instances2 := []*gossipInstance{}
|
|
for group := 0; group < 2; group++ {
|
|
for i := 0; i < 5; i++ {
|
|
group := group
|
|
id := fmt.Sprintf("id%d", group*5+i)
|
|
port := 8610 + group*5 + i
|
|
bootPeers := []string{bootPeer(bootPeerMap[port])}
|
|
pol := discPolForPeer(port)
|
|
inst := createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(8610+group*5+i, id, bootPeers, pol)
|
|
inst.initiateSync(defaultTestConfig.AliveExpirationTimeout/3, 10)
|
|
if group == 0 {
|
|
instances1 = append(instances1, inst)
|
|
} else {
|
|
instances2 = append(instances2, inst)
|
|
}
|
|
}
|
|
}
|
|
return instances1, instances2
|
|
}
|
|
|
|
func discPolForPeer(selfPort int) DisclosurePolicy {
|
|
return func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) {
|
|
targetPortStr := strings.Split(remotePeer.Endpoint, ":")[1]
|
|
targetPort, _ := strconv.ParseInt(targetPortStr, 10, 64)
|
|
return func(msg *protoext.SignedGossipMessage) bool {
|
|
portOfAliveMsgStr := strings.Split(msg.GetAliveMsg().Membership.Endpoint, ":")[1]
|
|
portOfAliveMsg, _ := strconv.ParseInt(portOfAliveMsgStr, 10, 64)
|
|
|
|
if portOfAliveMsg < 8615 && targetPort < 8615 {
|
|
return true
|
|
}
|
|
if portOfAliveMsg >= 8615 && targetPort >= 8615 {
|
|
return true
|
|
}
|
|
|
|
// Else, expose peers with even ids to other peers with even ids
|
|
return portOfAliveMsg%2 == 0 && targetPort%2 == 0
|
|
}, func(msg *protoext.SignedGossipMessage) *proto.Envelope {
|
|
envelope := protoG.Clone(msg.Envelope).(*proto.Envelope)
|
|
if selfPort < 8615 && targetPort >= 8615 {
|
|
envelope.SecretEnvelope = nil
|
|
}
|
|
|
|
if selfPort >= 8615 && targetPort < 8615 {
|
|
envelope.SecretEnvelope = nil
|
|
}
|
|
|
|
return envelope
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestCertificateChange(t *testing.T) {
|
|
bootPeers := []string{bootPeer(42611), bootPeer(42612), bootPeer(42613)}
|
|
p1 := createDiscoveryInstance(42611, "d1", bootPeers)
|
|
p2 := createDiscoveryInstance(42612, "d2", bootPeers)
|
|
p3 := createDiscoveryInstance(42613, "d3", bootPeers)
|
|
|
|
// Wait for membership establishment
|
|
assertMembership(t, []*gossipInstance{p1, p2, p3}, 2)
|
|
|
|
// Shutdown the second peer
|
|
waitUntilOrFailBlocking(t, p2.Stop)
|
|
|
|
var pingCountFrom1 uint32
|
|
var pingCountFrom3 uint32
|
|
// Program mocks to increment ping counters
|
|
p1.comm.lock.Lock()
|
|
p1.comm.mock = &mock.Mock{}
|
|
p1.comm.mock.On("SendToPeer", mock.Anything, mock.Anything)
|
|
p1.comm.mock.On("Ping").Run(func(arguments mock.Arguments) {
|
|
atomic.AddUint32(&pingCountFrom1, 1)
|
|
})
|
|
p1.comm.lock.Unlock()
|
|
|
|
p3.comm.lock.Lock()
|
|
p3.comm.mock = &mock.Mock{}
|
|
p3.comm.mock.On("SendToPeer", mock.Anything, mock.Anything)
|
|
p3.comm.mock.On("Ping").Run(func(arguments mock.Arguments) {
|
|
atomic.AddUint32(&pingCountFrom3, 1)
|
|
})
|
|
p3.comm.lock.Unlock()
|
|
|
|
pingCount1 := func() uint32 {
|
|
return atomic.LoadUint32(&pingCountFrom1)
|
|
}
|
|
|
|
pingCount3 := func() uint32 {
|
|
return atomic.LoadUint32(&pingCountFrom3)
|
|
}
|
|
|
|
c1 := pingCount1()
|
|
c3 := pingCount3()
|
|
|
|
// Ensure the first peer and third peer try to reconnect to it
|
|
waitUntilTimeoutOrFail(t, func() bool {
|
|
return pingCount1() > c1 && pingCount3() > c3
|
|
}, timeout)
|
|
|
|
// Tell the first peer that the second peer's PKI-ID has changed
|
|
// So that it will purge it from the membership entirely
|
|
p1.comm.identitySwitch <- common.PKIidType("localhost:42612")
|
|
|
|
c1 = pingCount1()
|
|
c3 = pingCount3()
|
|
// Ensure third peer tries to reconnect to it
|
|
waitUntilTimeoutOrFail(t, func() bool {
|
|
return pingCount3() > c3
|
|
}, timeout)
|
|
|
|
// Ensure the first peer ceases from trying
|
|
require.Equal(t, c1, pingCount1())
|
|
|
|
waitUntilOrFailBlocking(t, p1.Stop)
|
|
waitUntilOrFailBlocking(t, p3.Stop)
|
|
}
|
|
|
|
func TestMsgStoreExpiration(t *testing.T) {
|
|
// Starts 4 instances, wait for membership to build, stop 2 instances
|
|
// Check that membership in 2 running instances become 2
|
|
// Wait for expiration and check that alive messages and related entities in maps are removed in running instances
|
|
nodeNum := 4
|
|
bootPeers := []string{bootPeer(12611), bootPeer(12612)}
|
|
instances := []*gossipInstance{}
|
|
|
|
inst := createDiscoveryInstance(12611, "d1", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
inst = createDiscoveryInstance(12612, "d2", bootPeers)
|
|
instances = append(instances, inst)
|
|
|
|
for i := 3; i <= nodeNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstance(12610+i, id, bootPeers)
|
|
instances = append(instances, inst)
|
|
}
|
|
|
|
assertMembership(t, instances, nodeNum-1)
|
|
|
|
waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
|
|
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)
|
|
|
|
assertMembership(t, instances[:len(instances)-2], nodeNum-3)
|
|
|
|
checkMessages := func() bool {
|
|
for _, inst := range instances[:len(instances)-2] {
|
|
for _, downInst := range instances[len(instances)-2:] {
|
|
downCastInst := inst.discoveryImpl()
|
|
downCastInst.lock.RLock()
|
|
if _, exist := downCastInst.aliveLastTS[string(downInst.discoveryImpl().self.PKIid)]; exist {
|
|
downCastInst.lock.RUnlock()
|
|
return false
|
|
}
|
|
if _, exist := downCastInst.deadLastTS[string(downInst.discoveryImpl().self.PKIid)]; exist {
|
|
downCastInst.lock.RUnlock()
|
|
return false
|
|
}
|
|
if _, exist := downCastInst.id2Member[string(downInst.discoveryImpl().self.PKIid)]; exist {
|
|
downCastInst.lock.RUnlock()
|
|
return false
|
|
}
|
|
if downCastInst.aliveMembership.MsgByID(downInst.discoveryImpl().self.PKIid) != nil {
|
|
downCastInst.lock.RUnlock()
|
|
return false
|
|
}
|
|
if downCastInst.deadMembership.MsgByID(downInst.discoveryImpl().self.PKIid) != nil {
|
|
downCastInst.lock.RUnlock()
|
|
return false
|
|
}
|
|
for _, am := range downCastInst.msgStore.Get() {
|
|
m := am.(*protoext.SignedGossipMessage).GetAliveMsg()
|
|
if bytes.Equal(m.Membership.PkiId, downInst.discoveryImpl().self.PKIid) {
|
|
downCastInst.lock.RUnlock()
|
|
return false
|
|
}
|
|
}
|
|
downCastInst.lock.RUnlock()
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(DefMsgExpirationFactor+5))
|
|
|
|
assertMembership(t, instances[:len(instances)-2], nodeNum-3)
|
|
|
|
stopInstances(t, instances[:len(instances)-2])
|
|
}
|
|
|
|
func TestExpirationNoSecretEnvelope(t *testing.T) {
|
|
l, err := zap.NewDevelopment()
|
|
require.NoError(t, err)
|
|
|
|
removed := make(chan struct{})
|
|
logger := flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
|
|
if strings.Contains(entry.Message, "Removing member: Endpoint: foo") {
|
|
removed <- struct{}{}
|
|
}
|
|
return nil
|
|
}))
|
|
|
|
mockTracker := &mockAnchorPeerTracker{}
|
|
msgStore := newAliveMsgStore(&gossipDiscoveryImpl{
|
|
aliveExpirationTimeout: time.Millisecond,
|
|
lock: &sync.RWMutex{},
|
|
aliveMembership: util.NewMembershipStore(),
|
|
deadMembership: util.NewMembershipStore(),
|
|
logger: logger,
|
|
anchorPeerTracker: mockTracker,
|
|
})
|
|
|
|
msg := &proto.GossipMessage{
|
|
Content: &proto.GossipMessage_AliveMsg{
|
|
AliveMsg: &proto.AliveMessage{Membership: &proto.Member{
|
|
Endpoint: "foo",
|
|
}},
|
|
},
|
|
}
|
|
|
|
sMsg, err := protoext.NoopSign(msg)
|
|
require.NoError(t, err)
|
|
|
|
msgStore.Add(sMsg)
|
|
select {
|
|
case <-removed:
|
|
case <-time.After(time.Second * 10):
|
|
t.Fatalf("timed out")
|
|
}
|
|
}
|
|
|
|
func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
|
|
// Creates 3 discovery instances without gossip communication
|
|
// Generates MembershipRequest msg for each instance using createMembershipRequest
|
|
// Generates Alive msg for each instance using createAliveMessage
|
|
// Builds membership using Alive msgs
|
|
// Checks msgStore and related maps
|
|
// Generates MembershipResponse msgs for each instance using createMembershipResponse
|
|
// Generates new set of Alive msgs and processes them
|
|
// Checks msgStore and related maps
|
|
// Waits for expiration and checks msgStore and related maps
|
|
// Processes stored MembershipRequest msg and checks msgStore and related maps
|
|
// Processes stored MembershipResponse msg and checks msgStore and related maps
|
|
bootPeers := []string{}
|
|
peersNum := 3
|
|
instances := []*gossipInstance{}
|
|
aliveMsgs := []*protoext.SignedGossipMessage{}
|
|
newAliveMsgs := []*protoext.SignedGossipMessage{}
|
|
memReqMsgs := []*protoext.SignedGossipMessage{}
|
|
memRespMsgs := make(map[int][]*proto.MembershipResponse)
|
|
|
|
for i := 0; i < peersNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst := createDiscoveryInstanceWithNoGossip(22610+i, id, bootPeers)
|
|
inst.comm.disableComm = true
|
|
instances = append(instances, inst)
|
|
}
|
|
|
|
// Creating MembershipRequest messages
|
|
for i := 0; i < peersNum; i++ {
|
|
memReqMsg, _ := instances[i].discoveryImpl().createMembershipRequest(true)
|
|
sMsg, _ := protoext.NoopSign(memReqMsg)
|
|
memReqMsgs = append(memReqMsgs, sMsg)
|
|
}
|
|
// Creating Alive messages
|
|
for i := 0; i < peersNum; i++ {
|
|
aliveMsg, _ := instances[i].discoveryImpl().createSignedAliveMessage(true)
|
|
aliveMsgs = append(aliveMsgs, aliveMsg)
|
|
}
|
|
|
|
repeatForFiltered := func(n int, filter func(i int) bool, action func(i int)) {
|
|
for i := 0; i < n; i++ {
|
|
if filter(i) {
|
|
continue
|
|
}
|
|
action(i)
|
|
}
|
|
}
|
|
|
|
// Handling Alive
|
|
for i := 0; i < peersNum; i++ {
|
|
for k := 0; k < peersNum; k++ {
|
|
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
|
|
msg: aliveMsgs[k],
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
checkExistence := func(instances []*gossipInstance, msgs []*protoext.SignedGossipMessage, index int, i int, step string) {
|
|
_, exist := instances[index].discoveryImpl().aliveLastTS[string(instances[i].discoveryImpl().self.PKIid)]
|
|
require.True(t, exist, fmt.Sprint(step, " Data from alive msg ", i, " doesn't exist in aliveLastTS of discovery inst ", index))
|
|
|
|
_, exist = instances[index].discoveryImpl().id2Member[string(string(instances[i].discoveryImpl().self.PKIid))]
|
|
require.True(t, exist, fmt.Sprint(step, " id2Member mapping doesn't exist for alive msg ", i, " of discovery inst ", index))
|
|
|
|
require.NotNil(t, instances[index].discoveryImpl().aliveMembership.MsgByID(instances[i].discoveryImpl().self.PKIid), fmt.Sprint(step, " Alive msg", i, " not exist in aliveMembership of discovery inst ", index))
|
|
|
|
require.Contains(t, instances[index].discoveryImpl().msgStore.Get(), msgs[i], fmt.Sprint(step, " Alive msg ", i, "not stored in store of discovery inst ", index))
|
|
}
|
|
|
|
checkAliveMsgExist := func(instances []*gossipInstance, msgs []*protoext.SignedGossipMessage, index int, step string) {
|
|
instances[index].discoveryImpl().lock.RLock()
|
|
defer instances[index].discoveryImpl().lock.RUnlock()
|
|
repeatForFiltered(peersNum,
|
|
func(k int) bool {
|
|
return k == index
|
|
},
|
|
func(k int) {
|
|
checkExistence(instances, msgs, index, k, step)
|
|
})
|
|
}
|
|
|
|
// Checking is Alive was processed
|
|
for i := 0; i < peersNum; i++ {
|
|
checkAliveMsgExist(instances, aliveMsgs, i, "[Step 1 - processing aliveMsg]")
|
|
}
|
|
|
|
// Creating MembershipResponse while all instances have full membership
|
|
for i := 0; i < peersNum; i++ {
|
|
peerToResponse := &NetworkMember{
|
|
Metadata: []byte{},
|
|
PKIid: []byte(fmt.Sprintf("localhost:%d", 22610+i)),
|
|
Endpoint: fmt.Sprintf("localhost:%d", 22610+i),
|
|
InternalEndpoint: fmt.Sprintf("localhost:%d", 22610+i),
|
|
}
|
|
memRespMsgs[i] = []*proto.MembershipResponse{}
|
|
repeatForFiltered(peersNum,
|
|
func(k int) bool {
|
|
return k == i
|
|
},
|
|
func(k int) {
|
|
aliveMsg, _ := instances[k].discoveryImpl().createSignedAliveMessage(true)
|
|
memResp := instances[k].discoveryImpl().createMembershipResponse(aliveMsg, peerToResponse)
|
|
memRespMsgs[i] = append(memRespMsgs[i], memResp)
|
|
})
|
|
}
|
|
|
|
// Re-creating Alive msgs with highest seq_num, to make sure Alive msgs in memReq and memResp are older
|
|
for i := 0; i < peersNum; i++ {
|
|
aliveMsg, _ := instances[i].discoveryImpl().createSignedAliveMessage(true)
|
|
newAliveMsgs = append(newAliveMsgs, aliveMsg)
|
|
}
|
|
|
|
// Handling new Alive set
|
|
for i := 0; i < peersNum; i++ {
|
|
for k := 0; k < peersNum; k++ {
|
|
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
|
|
msg: newAliveMsgs[k],
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// Checking is new Alive was processed
|
|
for i := 0; i < peersNum; i++ {
|
|
checkAliveMsgExist(instances, newAliveMsgs, i, "[Step 2 - proccesing aliveMsg]")
|
|
}
|
|
|
|
checkAliveMsgNotExist := func(instances []*gossipInstance, msgs []*protoext.SignedGossipMessage, index int, step string) {
|
|
instances[index].discoveryImpl().lock.RLock()
|
|
defer instances[index].discoveryImpl().lock.RUnlock()
|
|
require.Empty(t, instances[index].discoveryImpl().aliveLastTS, fmt.Sprint(step, " Data from alive msg still exists in aliveLastTS of discovery inst ", index))
|
|
require.Empty(t, instances[index].discoveryImpl().deadLastTS, fmt.Sprint(step, " Data from alive msg still exists in deadLastTS of discovery inst ", index))
|
|
require.Empty(t, instances[index].discoveryImpl().id2Member, fmt.Sprint(step, " id2Member mapping still contains data related to Alive msg: discovery inst ", index))
|
|
require.Empty(t, instances[index].discoveryImpl().msgStore.Get(), fmt.Sprint(step, " Expired Alive msg still stored in store of discovery inst ", index))
|
|
require.Zero(t, instances[index].discoveryImpl().aliveMembership.Size(), fmt.Sprint(step, " Alive membership list is not empty, discovery instance", index))
|
|
require.Zero(t, instances[index].discoveryImpl().deadMembership.Size(), fmt.Sprint(step, " Dead membership list is not empty, discovery instance", index))
|
|
}
|
|
|
|
// Sleep until expire
|
|
time.Sleep(defaultTestConfig.AliveExpirationTimeout * (DefMsgExpirationFactor + 5))
|
|
|
|
// Checking Alive expired
|
|
for i := 0; i < peersNum; i++ {
|
|
checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step 3 - expiration in msg store]")
|
|
}
|
|
|
|
// Processing old MembershipRequest
|
|
for i := 0; i < peersNum; i++ {
|
|
repeatForFiltered(peersNum,
|
|
func(k int) bool {
|
|
return k == i
|
|
},
|
|
func(k int) {
|
|
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
|
|
msg: memReqMsgs[k],
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
|
|
},
|
|
})
|
|
})
|
|
}
|
|
|
|
// MembershipRequest processing didn't change anything
|
|
for i := 0; i < peersNum; i++ {
|
|
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 4 - memReq processing after expiration]")
|
|
}
|
|
|
|
// Processing old (later) Alive messages
|
|
for i := 0; i < peersNum; i++ {
|
|
for k := 0; k < peersNum; k++ {
|
|
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
|
|
msg: aliveMsgs[k],
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// Alive msg processing didn't change anything
|
|
for i := 0; i < peersNum; i++ {
|
|
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 5.1 - after lost old aliveMsg process]")
|
|
checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step 5.2 - after lost new aliveMsg process]")
|
|
}
|
|
|
|
// Handling old MembershipResponse messages
|
|
for i := 0; i < peersNum; i++ {
|
|
respForPeer := memRespMsgs[i]
|
|
for _, msg := range respForPeer {
|
|
sMsg, _ := protoext.NoopSign(&proto.GossipMessage{
|
|
Tag: proto.GossipMessage_EMPTY,
|
|
Nonce: uint64(0),
|
|
Content: &proto.GossipMessage_MemRes{
|
|
MemRes: msg,
|
|
},
|
|
})
|
|
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
|
|
msg: sMsg,
|
|
info: &protoext.ConnectionInfo{
|
|
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// MembershipResponse msg processing didn't change anything
|
|
for i := 0; i < peersNum; i++ {
|
|
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 6 - after lost MembershipResp process]")
|
|
}
|
|
|
|
for i := 0; i < peersNum; i++ {
|
|
instances[i].Stop()
|
|
}
|
|
}
|
|
|
|
func TestAliveMsgStore(t *testing.T) {
|
|
bootPeers := []string{}
|
|
peersNum := 2
|
|
instances := []*gossipInstance{}
|
|
aliveMsgs := []*protoext.SignedGossipMessage{}
|
|
memReqMsgs := []*protoext.SignedGossipMessage{}
|
|
|
|
for i := 0; i < peersNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst := createDiscoveryInstanceWithNoGossip(32610+i, id, bootPeers)
|
|
instances = append(instances, inst)
|
|
}
|
|
|
|
// Creating MembershipRequest messages
|
|
for i := 0; i < peersNum; i++ {
|
|
memReqMsg, _ := instances[i].discoveryImpl().createMembershipRequest(true)
|
|
sMsg, _ := protoext.NoopSign(memReqMsg)
|
|
memReqMsgs = append(memReqMsgs, sMsg)
|
|
}
|
|
// Creating Alive messages
|
|
for i := 0; i < peersNum; i++ {
|
|
aliveMsg, _ := instances[i].discoveryImpl().createSignedAliveMessage(true)
|
|
aliveMsgs = append(aliveMsgs, aliveMsg)
|
|
}
|
|
|
|
// Check new alive msgs
|
|
for _, msg := range aliveMsgs {
|
|
require.True(t, instances[0].discoveryImpl().msgStore.CheckValid(msg), "aliveMsgStore CheckValid returns false on new AliveMsg")
|
|
}
|
|
|
|
// Add new alive msgs
|
|
for _, msg := range aliveMsgs {
|
|
require.True(t, instances[0].discoveryImpl().msgStore.Add(msg), "aliveMsgStore Add returns false on new AliveMsg")
|
|
}
|
|
|
|
// Check exist alive msgs
|
|
for _, msg := range aliveMsgs {
|
|
require.False(t, instances[0].discoveryImpl().msgStore.CheckValid(msg), "aliveMsgStore CheckValid returns true on existing AliveMsg")
|
|
}
|
|
|
|
// Check non-alive msgs
|
|
for _, msg := range memReqMsgs {
|
|
require.Panics(t, func() { instances[1].discoveryImpl().msgStore.CheckValid(msg) }, "aliveMsgStore CheckValid should panic on new MembershipRequest msg")
|
|
require.Panics(t, func() { instances[1].discoveryImpl().msgStore.Add(msg) }, "aliveMsgStore Add should panic on new MembershipRequest msg")
|
|
}
|
|
}
|
|
|
|
func TestMemRespDisclosurePol(t *testing.T) {
|
|
pol := func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) {
|
|
assert.Equal(t, remotePeer.InternalEndpoint, remotePeer.Endpoint)
|
|
return func(_ *protoext.SignedGossipMessage) bool {
|
|
return remotePeer.Endpoint != "localhost:7879"
|
|
}, func(m *protoext.SignedGossipMessage) *proto.Envelope {
|
|
return m.Envelope
|
|
}
|
|
}
|
|
|
|
wasMembershipResponseReceived := func(msg *protoext.SignedGossipMessage) {
|
|
assert.Nil(t, msg.GetMemRes())
|
|
}
|
|
|
|
d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol, defaultTestConfig)
|
|
defer d1.Stop()
|
|
d2 := createDiscoveryInstanceThatGossipsWithInterceptors(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, wasMembershipResponseReceived, defaultTestConfig)
|
|
defer d2.Stop()
|
|
d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, pol, defaultTestConfig)
|
|
defer d3.Stop()
|
|
|
|
// all peers know each other
|
|
assertMembership(t, []*gossipInstance{d1, d2, d3}, 2)
|
|
// d2 received some messages, but we asserted that none of them are membership responses.
|
|
assert.NotZero(t, d2.receivedMsgCount())
|
|
assert.NotZero(t, d2.sentMsgCount())
|
|
}
|
|
|
|
func TestMembersByID(t *testing.T) {
|
|
members := Members{
|
|
{PKIid: common.PKIidType("p0"), Endpoint: "p0"},
|
|
{PKIid: common.PKIidType("p1"), Endpoint: "p1"},
|
|
}
|
|
byID := members.ByID()
|
|
require.Len(t, byID, 2)
|
|
require.Equal(t, "p0", byID["p0"].Endpoint)
|
|
require.Equal(t, "p1", byID["p1"].Endpoint)
|
|
}
|
|
|
|
func TestFilter(t *testing.T) {
|
|
members := Members{
|
|
{PKIid: common.PKIidType("p0"), Endpoint: "p0", Properties: &proto.Properties{
|
|
Chaincodes: []*proto.Chaincode{{Name: "cc", Version: "1.0"}},
|
|
}},
|
|
{PKIid: common.PKIidType("p1"), Endpoint: "p1", Properties: &proto.Properties{
|
|
Chaincodes: []*proto.Chaincode{{Name: "cc", Version: "2.0"}},
|
|
}},
|
|
}
|
|
res := members.Filter(func(member NetworkMember) bool {
|
|
cc := member.Properties.Chaincodes[0]
|
|
return cc.Version == "2.0" && cc.Name == "cc"
|
|
})
|
|
require.Equal(t, Members{members[1]}, res)
|
|
}
|
|
|
|
func TestMap(t *testing.T) {
|
|
members := Members{
|
|
{PKIid: common.PKIidType("p0"), Endpoint: "p0"},
|
|
{PKIid: common.PKIidType("p1"), Endpoint: "p1"},
|
|
}
|
|
expectedMembers := Members{
|
|
{PKIid: common.PKIidType("p0"), Endpoint: "p0", Properties: &proto.Properties{LedgerHeight: 2}},
|
|
{PKIid: common.PKIidType("p1"), Endpoint: "p1", Properties: &proto.Properties{LedgerHeight: 2}},
|
|
}
|
|
|
|
addProperty := func(member NetworkMember) NetworkMember {
|
|
member.Properties = &proto.Properties{
|
|
LedgerHeight: 2,
|
|
}
|
|
return member
|
|
}
|
|
|
|
require.Equal(t, expectedMembers, members.Map(addProperty))
|
|
// Ensure original members didn't change
|
|
require.Nil(t, members[0].Properties)
|
|
require.Nil(t, members[1].Properties)
|
|
}
|
|
|
|
func TestMembersIntersect(t *testing.T) {
|
|
members1 := Members{
|
|
{PKIid: common.PKIidType("p0"), Endpoint: "p0"},
|
|
{PKIid: common.PKIidType("p1"), Endpoint: "p1"},
|
|
}
|
|
members2 := Members{
|
|
{PKIid: common.PKIidType("p1"), Endpoint: "p1"},
|
|
{PKIid: common.PKIidType("p2"), Endpoint: "p2"},
|
|
}
|
|
require.Equal(t, Members{{PKIid: common.PKIidType("p1"), Endpoint: "p1"}}, members1.Intersect(members2))
|
|
}
|
|
|
|
func TestPeerIsolation(t *testing.T) {
|
|
// Scenario:
|
|
// Start 3 peers (peer0, peer1, peer2). Set peer1 as the bootstrap peer for all.
|
|
// Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership
|
|
|
|
config := defaultTestConfig
|
|
// Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test.
|
|
config.AliveExpirationTimeout = 2 * config.AliveTimeInterval
|
|
|
|
peersNum := 3
|
|
bootPeers := []string{bootPeer(7121)}
|
|
instances := []*gossipInstance{}
|
|
var inst *gossipInstance
|
|
|
|
// Start all peers and wait for full membership
|
|
for i := 0; i < peersNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstanceCustomConfig(7120+i, id, bootPeers, config)
|
|
instances = append(instances, inst)
|
|
}
|
|
assertMembership(t, instances, peersNum-1)
|
|
|
|
// Stop the first 2 peers so the third peer would stay alone
|
|
stopInstances(t, instances[:peersNum-1])
|
|
assertMembership(t, instances[peersNum-1:], 0)
|
|
|
|
// Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
|
|
// Add a second as buffer
|
|
time.Sleep(config.AliveExpirationTimeout*DefMsgExpirationFactor + time.Second)
|
|
|
|
// Start again the first 2 peers and wait for all the peers to get full membership.
|
|
// Especially, we want to test that peer2 won't be isolated
|
|
for i := 0; i < peersNum-1; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstanceCustomConfig(7120+i, id, bootPeers, config)
|
|
instances[i] = inst
|
|
}
|
|
assertMembership(t, instances, peersNum-1)
|
|
}
|
|
|
|
func TestMembershipAfterExpiration(t *testing.T) {
|
|
// Scenario:
|
|
// Start 3 peers (peer0, peer1, peer2). Set peer0 as the anchor peer.
|
|
// Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership
|
|
|
|
config := defaultTestConfig
|
|
// Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test.
|
|
config.AliveExpirationTimeout = 2 * config.AliveTimeInterval
|
|
config.ReconnectInterval = config.AliveExpirationTimeout
|
|
config.MsgExpirationFactor = 5
|
|
|
|
peersNum := 3
|
|
ports := []int{9120, 9121, 9122}
|
|
anchorPeer := "localhost:9120"
|
|
bootPeers := []string{}
|
|
instances := []*gossipInstance{}
|
|
var inst *gossipInstance
|
|
mockTracker := &mockAnchorPeerTracker{[]string{anchorPeer}}
|
|
|
|
l, err := zap.NewDevelopment()
|
|
assert.NoError(t, err)
|
|
expired := make(chan struct{}, 1)
|
|
|
|
// use a custom logger to verify messages from expiration callback
|
|
loggerThatTracksCustomMessage := func() util.Logger {
|
|
var lock sync.RWMutex
|
|
expectedMsgs := map[string]struct{}{
|
|
"Do not remove bootstrap or anchor peer endpoint localhost:9120 from membership": {},
|
|
"Removing member: Endpoint: localhost:9121, InternalEndpoint: localhost:9121, PKIID: 6c6f63616c686f73743a39313231": {},
|
|
}
|
|
|
|
return flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
|
|
// do nothing if we already found all the expectedMsgs
|
|
lock.RLock()
|
|
expectedMsgSize := len(expectedMsgs)
|
|
lock.RUnlock()
|
|
|
|
if expectedMsgSize == 0 {
|
|
select {
|
|
case expired <- struct{}{}:
|
|
default:
|
|
// no room is fine, continue
|
|
}
|
|
return nil
|
|
}
|
|
|
|
lock.Lock()
|
|
defer lock.Unlock()
|
|
|
|
delete(expectedMsgs, entry.Message)
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
// Start all peers, connect to the anchor peer and verify full membership
|
|
for i := 0; i < peersNum; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
logger := loggerThatTracksCustomMessage()
|
|
inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *protoext.SignedGossipMessage) {}, config, mockTracker, logger)
|
|
instances = append(instances, inst)
|
|
}
|
|
for i := 1; i < peersNum; i++ {
|
|
connect(instances[i], anchorPeer)
|
|
}
|
|
assertMembership(t, instances, peersNum-1)
|
|
|
|
// Stop peer0 and peer1 so that peer2 would stay alone
|
|
stopInstances(t, instances[0:peersNum-1])
|
|
|
|
// waitTime is the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
|
|
// Add a second as buffer
|
|
waitTime := config.AliveExpirationTimeout*time.Duration(config.MsgExpirationFactor) + time.Second
|
|
select {
|
|
case <-expired:
|
|
case <-time.After(waitTime):
|
|
t.Fatalf("timed out")
|
|
}
|
|
// peer2's deadMembership should contain the anchor peer
|
|
deadMemeberShip := instances[peersNum-1].discoveryImpl().deadMembership
|
|
require.Equal(t, 1, deadMemeberShip.Size())
|
|
assertMembership(t, instances[peersNum-1:], 0)
|
|
|
|
// Start again peer0 and peer1 and wait for all the peers to get full membership.
|
|
// Especially, we want to test that peer2 won't be isolated
|
|
for i := 0; i < peersNum-1; i++ {
|
|
id := fmt.Sprintf("d%d", i)
|
|
inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *protoext.SignedGossipMessage) {}, config, mockTracker, nil)
|
|
instances[i] = inst
|
|
}
|
|
connect(instances[1], anchorPeer)
|
|
assertMembership(t, instances, peersNum-1)
|
|
}
|
|
|
|
func connect(inst *gossipInstance, endpoint string) {
|
|
inst.comm.lock.Lock()
|
|
inst.comm.mock = &mock.Mock{}
|
|
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
|
|
inst := inst
|
|
msg := arguments.Get(1).(*protoext.SignedGossipMessage)
|
|
if req := msg.GetMemReq(); req != nil {
|
|
inst.comm.lock.Lock()
|
|
inst.comm.mock = nil
|
|
inst.comm.lock.Unlock()
|
|
}
|
|
})
|
|
inst.comm.mock.On("Ping", mock.Anything)
|
|
inst.comm.lock.Unlock()
|
|
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
|
|
inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) {
|
|
return &PeerIdentification{SelfOrg: true, ID: nil}, nil
|
|
})
|
|
}
|
|
|
|
func waitUntilOrFail(t *testing.T, pred func() bool) {
|
|
waitUntilTimeoutOrFail(t, pred, timeout)
|
|
}
|
|
|
|
func waitUntilTimeoutOrFail(t *testing.T, pred func() bool, timeout time.Duration) {
|
|
start := time.Now()
|
|
limit := start.UnixNano() + timeout.Nanoseconds()
|
|
for time.Now().UnixNano() < limit {
|
|
if pred() {
|
|
return
|
|
}
|
|
time.Sleep(timeout / 10)
|
|
}
|
|
require.Fail(t, "Timeout expired!")
|
|
}
|
|
|
|
func waitUntilOrFailBlocking(t *testing.T, f func()) {
|
|
successChan := make(chan struct{}, 1)
|
|
go func() {
|
|
f()
|
|
successChan <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-time.NewTimer(timeout).C:
|
|
break
|
|
case <-successChan:
|
|
return
|
|
}
|
|
require.Fail(t, "Timeout expired!")
|
|
}
|
|
|
|
func stopInstances(t *testing.T, instances []*gossipInstance) {
|
|
stopAction := &sync.WaitGroup{}
|
|
for _, inst := range instances {
|
|
stopAction.Add(1)
|
|
go func(inst *gossipInstance) {
|
|
defer stopAction.Done()
|
|
inst.Stop()
|
|
}(inst)
|
|
}
|
|
|
|
waitUntilOrFailBlocking(t, stopAction.Wait)
|
|
}
|
|
|
|
func assertMembership(t *testing.T, instances []*gossipInstance, expectedNum int) {
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(instances))
|
|
|
|
ctx, cancelation := context.WithTimeout(context.Background(), timeout)
|
|
defer cancelation()
|
|
|
|
for _, inst := range instances {
|
|
go func(ctx context.Context, i *gossipInstance) {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(timeout / 10):
|
|
if len(i.GetMembership()) == expectedNum {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}(ctx, inst)
|
|
}
|
|
|
|
wg.Wait()
|
|
require.NoError(t, ctx.Err(), "Timeout expired!")
|
|
}
|
|
|
|
func portsOfMembers(members []NetworkMember) []int {
|
|
ports := make([]int, len(members))
|
|
for i := range members {
|
|
ports[i] = portOfEndpoint(members[i].Endpoint)
|
|
}
|
|
sort.Ints(ports)
|
|
return ports
|
|
}
|
|
|
|
func portOfEndpoint(endpoint string) int {
|
|
port, _ := strconv.ParseInt(strings.Split(endpoint, ":")[1], 10, 64)
|
|
return int(port)
|
|
}
|